diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 9f93cbadcb27..f70e22093d30 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -4,9 +4,11 @@ ### Features Added -- Exported log.Event constants for azservicebus. This will maek them easier to - discover and they are also documented. The text of log messages themselves - are not guaranteed to be stable. (#TBD) +- Exported log.Event constants for azservicebus. This will make them easier to + discover and they are also documented. NOTE: The log messages themselves + are not guaranteed to be stable. (#17596) +- `admin.Client` can now manage authorization rules and subscription filters and + actions. (#17616) ### Breaking Changes diff --git a/sdk/messaging/azservicebus/admin/admin_client.go b/sdk/messaging/azservicebus/admin/admin_client.go index 6159695713bf..15b9df7af1a9 100644 --- a/sdk/messaging/azservicebus/admin/admin_client.go +++ b/sdk/messaging/azservicebus/admin/admin_client.go @@ -5,6 +5,7 @@ package admin import ( "context" + "errors" "fmt" "net/http" "time" @@ -214,3 +215,22 @@ func (ep *entityPager[TFeed, T, TOutput]) Fetcher(ctx context.Context) ([]TOutpu return finalItems, nil } + +// mapATOMError checks if the error is a legitimate 404 or a "fake" 404 (where the service succeeded but gave us back an +// empty feed instead). This "fake" behavior comes about because the API here is not truly a CRUD API (it's extremely close) +// so we have to do some small workarounds. +// NOTE: we had a debate about whether to return a nil instance or try to fabricate an HTTP 404 response instead (even if +// one didn't come back) and went with 'nil' to avoid having a fake HTTP response, which would have been confusing. +func mapATOMError[T any](err error) (*T, error) { + if errors.Is(err, atom.ErrFeedEmpty) { + return nil, nil + } + + var respError *azcore.ResponseError + + if errors.As(err, &respError) && respError.StatusCode == http.StatusNotFound { + return nil, nil + } + + return nil, err +} diff --git a/sdk/messaging/azservicebus/admin/admin_client_models.go b/sdk/messaging/azservicebus/admin/admin_client_models.go index f1fc7be7bc30..df7052e7a1cb 100644 --- a/sdk/messaging/azservicebus/admin/admin_client_models.go +++ b/sdk/messaging/azservicebus/admin/admin_client_models.go @@ -3,6 +3,12 @@ package admin +import ( + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom" +) + // EntityStatus represents the current status of the entity. type EntityStatus string @@ -16,3 +22,87 @@ const ( // EntityStatusReceiveDisabled indicates that an entity cannot be used for receiving. EntityStatusReceiveDisabled EntityStatus = "ReceiveDisabled" ) + +type ( + // AccessRight is an access right (Manage, Send, Listen) for an AuthorizationRule. + AccessRight string + + // AuthorizationRule is a rule with keys and rights associated with an entity. + AuthorizationRule struct { + // AccessRights for this rule. + AccessRights []AccessRight + + // KeyName for this rule. + KeyName *string + + // CreatedTime for this rule. + CreatedTime *time.Time + + // ModifiedTime for this rule. + ModifiedTime *time.Time + + // PrimaryKey for this rule. + PrimaryKey *string + + // SecondaryKey for this rule. + SecondaryKey *string + } +) + +const ( + // AccessRightManage allows changes to an entity. + AccessRightManage AccessRight = "Manage" + // AccessRightSend allows you to send messages to this entity. + AccessRightSend AccessRight = "Send" + // AccessRightListen allows you to receive messages from this entity. + AccessRightListen AccessRight = "Listen" +) + +func internalAccessRightsToPublic(internalRules []atom.AuthorizationRule) []AuthorizationRule { + var rules []AuthorizationRule + + for _, rule := range internalRules { + var accessRights []AccessRight + + for _, right := range rule.Rights { + accessRights = append(accessRights, AccessRight(right)) + } + + rules = append(rules, AuthorizationRule{ + AccessRights: accessRights, + KeyName: rule.KeyName, + CreatedTime: rule.CreatedTime, + ModifiedTime: rule.ModifiedTime, + PrimaryKey: rule.PrimaryKey, + SecondaryKey: rule.SecondaryKey, + }) + } + + return rules +} + +func publicAccessRightsToInternal(rules []AuthorizationRule) []atom.AuthorizationRule { + var internalRules []atom.AuthorizationRule + + for _, rule := range rules { + var accessRights []string + + for _, right := range rule.AccessRights { + accessRights = append(accessRights, string(right)) + } + + internalRules = append(internalRules, atom.AuthorizationRule{ + Type: "SharedAccessAuthorizationRule", + ClaimType: "SharedAccessKey", + ClaimValue: "None", + Rights: accessRights, + KeyName: rule.KeyName, + CreatedTime: rule.CreatedTime, + ModifiedTime: rule.ModifiedTime, + PrimaryKey: rule.PrimaryKey, + SecondaryKey: rule.SecondaryKey, + }) + } + + return internalRules +} diff --git a/sdk/messaging/azservicebus/admin/admin_client_queue.go b/sdk/messaging/azservicebus/admin/admin_client_queue.go index 07536d7c27ae..a3061bade991 100644 --- a/sdk/messaging/azservicebus/admin/admin_client_queue.go +++ b/sdk/messaging/azservicebus/admin/admin_client_queue.go @@ -5,7 +5,6 @@ package admin import ( "context" - "errors" "net/http" "time" @@ -72,6 +71,9 @@ type QueueProperties struct { // UserMetadata is custom metadata that user can associate with the queue. UserMetadata *string + + // AuthorizationRules are the authorization rules for this entity. + AuthorizationRules []AuthorizationRule } // QueueRuntimeProperties represent dynamic properties of a queue, such as the ActiveMessageCount. @@ -178,11 +180,7 @@ func (ac *Client) GetQueue(ctx context.Context, queueName string, options *GetQu _, err := ac.em.Get(ctx, "/"+queueName, &atomResp) if err != nil { - if errors.Is(err, atom.ErrFeedEmpty) { - return nil, nil - } - - return nil, err + return mapATOMError[GetQueueResponse](err) } queueItem, err := newQueueItem(atomResp) @@ -213,11 +211,7 @@ func (ac *Client) GetQueueRuntimeProperties(ctx context.Context, queueName strin _, err := ac.em.Get(ctx, "/"+queueName, &atomResp) if err != nil { - if errors.Is(err, atom.ErrFeedEmpty) { - return nil, nil - } - - return nil, err + return mapATOMError[GetQueueRuntimePropertiesResponse](err) } item, err := newQueueRuntimePropertiesItem(atomResp) @@ -264,8 +258,8 @@ type QueueItem struct { QueueProperties } -// ListQueues lists queues. -func (ac *Client) ListQueues(options *ListQueuesOptions) *runtime.Pager[ListQueuesResponse] { +// NewListQueuesPager creates a pager that can be used to list queues. +func (ac *Client) NewListQueuesPager(options *ListQueuesOptions) *runtime.Pager[ListQueuesResponse] { var pageSize int32 if options != nil { @@ -314,8 +308,8 @@ type QueueRuntimePropertiesItem struct { QueueRuntimeProperties } -// ListQueuesRuntimeProperties lists runtime properties for queues. -func (ac *Client) ListQueuesRuntimeProperties(options *ListQueuesRuntimePropertiesOptions) *runtime.Pager[ListQueuesRuntimePropertiesResponse] { +// NewListQueuesRuntimePropertiesPager creates a pager that lists the runtime properties for queues. +func (ac *Client) NewListQueuesRuntimePropertiesPager(options *ListQueuesRuntimePropertiesOptions) *runtime.Pager[ListQueuesRuntimePropertiesResponse] { var pageSize int32 if options != nil { @@ -397,6 +391,7 @@ func newQueueEnvelope(props *QueueProperties, tokenProvider auth.TokenProvider) ForwardTo: props.ForwardTo, ForwardDeadLetteredMessagesTo: props.ForwardDeadLetteredMessagesTo, UserMetadata: props.UserMetadata, + AuthorizationRules: publicAccessRightsToInternal(props.AuthorizationRules), } return atom.WrapWithQueueEnvelope(qpr, tokenProvider) @@ -421,6 +416,7 @@ func newQueueItem(env *atom.QueueEnvelope) (*QueueItem, error) { ForwardTo: desc.ForwardTo, ForwardDeadLetteredMessagesTo: desc.ForwardDeadLetteredMessagesTo, UserMetadata: desc.UserMetadata, + AuthorizationRules: internalAccessRightsToPublic(desc.AuthorizationRules), } return &QueueItem{ diff --git a/sdk/messaging/azservicebus/admin/admin_client_rules.go b/sdk/messaging/azservicebus/admin/admin_client_rules.go new file mode 100644 index 000000000000..e7224924a334 --- /dev/null +++ b/sdk/messaging/azservicebus/admin/admin_client_rules.go @@ -0,0 +1,735 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package admin + +import ( + "context" + "encoding/xml" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom" +) + +// Rule specifies a message filter and action for a subscription. +type Rule struct { + // Filter is the filter that will be used for Rule. + // Valid types: *SQLFilter, *CorrelationFilter, *FalseFilter, *TrueFilter + Filter RuleFilter + + // Action is the action that will be used for Rule. + // Valid types: *SQLAction + Action RuleAction +} + +// RuleFilter is a filter for a subscription rule. +// Implemented by: *SQLFilter, *CorrelationFilter, *FalseFilter, *TrueFilter +type RuleFilter interface { + ruleFilter() +} + +// RuleAction is an action for a subscription rule. +// Implemented by: *SQLAction +type RuleAction interface { + ruleAction() +} + +// SQLAction is an action that updates a message according to its +// expression. +type SQLAction struct { + // Expression is a SQL Expression + Expression string + + // Parameters is a map of string to values of type string, number, or boolean. + Parameters map[string]interface{} +} + +func (a *SQLAction) ruleAction() {} + +// UnknownRuleAction is an action type not yet handled by this SDK. +// If you get this type back you should update to a newer version of the SDK +// which properly represents this type. +type UnknownRuleAction struct { + // Type is the Service Bus type for this action. + Type string + + // RawXML is the raw XML for this action that could not be parsed. + RawXML []byte +} + +func (a *UnknownRuleAction) ruleAction() {} + +// SQLFilter is a filter that evaluates to true for any message that matches +// its expression. +type SQLFilter struct { + // Expression is a SQL Expression + Expression string + + // Parameters is a map of string to values of type string, number, or boolean. + Parameters map[string]interface{} +} + +func (f *SQLFilter) ruleFilter() {} + +// TrueFilter is a filter that always evaluates to true for any message. +type TrueFilter struct{} + +func (f *TrueFilter) ruleFilter() {} + +// FalseFilter is a filter that always evaluates to false for any message. +type FalseFilter struct{} + +func (f *FalseFilter) ruleFilter() {} + +// CorrelationFilter represents a set of conditions that are matched against user +// and system properties of messages for a subscription. +type CorrelationFilter struct { + // ApplicationProperties will be matched against the application properties for the message. + ApplicationProperties map[string]interface{} + + // ContentType will be matched against the ContentType property for the message. + ContentType *string + + // CorrelationID will be matched against the CorrelationID property for the message. + CorrelationID *string + + // MessageID will be matched against the MessageID property for the message. + MessageID *string + + // ReplyTo will be matched against the ReplyTo property for the message. + ReplyTo *string + + // ReplyToSessionID will be matched against the ReplyToSessionID property for the message. + ReplyToSessionID *string + + // SessionID will be matched against the SessionID property for the message. + SessionID *string + + // Subject will be matched against the Subject property for the message. + Subject *string + + // To will be matched against the To property for the message. + To *string +} + +func (f *CorrelationFilter) ruleFilter() {} + +// UnknownRuleFilter is a filter type not yet handled by this SDK. +// If you get this type back you should update to a newer version of the SDK +// which properly represents this type. +type UnknownRuleFilter struct { + // Type is the Service Bus type for this filter. + Type string + + // RawXML is the raw XML for this rule that could not be parsed. + RawXML []byte +} + +func (f *UnknownRuleFilter) ruleFilter() {} + +// RuleProperties are the properties for a rule. +type RuleProperties struct { + // Name is the name of this rule. + Name string + + // Filter is the filter that will be used for Rule. + // Valid types: *SQLFilter, *CorrelationFilter, *FalseFilter, *TrueFilter + Filter RuleFilter + + // Action is the action that will be used for Rule. + // Valid types: *SQLAction + Action RuleAction +} + +// CreateRuleResponse contains the response fields for Client.CreateRule +type CreateRuleResponse struct { + RuleProperties +} + +// CreateRuleOptions contains the optional parameters for Client.CreateRule +type CreateRuleOptions struct { + // Name is the name of the rule or nil, which will default to $Default + Name *string + + // Filter is the filter that will be used for Rule. + // Valid types: *SQLFilter, *CorrelationFilter, *FalseFilter, *TrueFilter + Filter RuleFilter + + // Action is the action that will be used for Rule. + // Valid types: *SQLAction + Action RuleAction +} + +// CreateRule creates a rule that can filter and update message for a subscription. +func (ac *Client) CreateRule(ctx context.Context, topicName string, subscriptionName string, options *CreateRuleOptions) (CreateRuleResponse, error) { + ruleName := "" + + if options != nil && options.Name != nil { + ruleName = *options.Name + } + + resp, _, err := ac.createOrUpdateRule(ctx, topicName, subscriptionName, RuleProperties{ + Name: ruleName, + Filter: options.Filter, + Action: options.Action, + }, true) + + if err != nil { + return CreateRuleResponse{}, err + } + + return CreateRuleResponse{RuleProperties: *resp}, nil +} + +// GetRuleResponse contains the response fields for Client.GetRule +type GetRuleResponse struct { + // RuleProperties for the rule. + RuleProperties +} + +// GetRuleOptions contains the optional parameters for Client.GetRule +type GetRuleOptions struct { + // For future expansion +} + +// GetRule gets a rule for a subscription. +func (ac *Client) GetRule(ctx context.Context, topicName string, subscriptionName string, ruleName string, options *GetRuleOptions) (*GetRuleResponse, error) { + var ruleEnv *atom.RuleEnvelope + + _, err := ac.em.Get(ctx, fmt.Sprintf("/%s/Subscriptions/%s/Rules/%s", topicName, subscriptionName, ruleName), &ruleEnv) + + if err != nil { + return mapATOMError[GetRuleResponse](err) + } + + props, err := ac.newRuleProperties(ruleEnv) + + if err != nil { + return nil, err + } + + return &GetRuleResponse{ + RuleProperties: *props, + }, nil +} + +// ListRulesResponse contains the response fields for the pager returned from Client.ListRules. +type ListRulesResponse struct { + // Rules are all the rules for the page. + Rules []RuleProperties +} + +// ListRulesOptions contains the optional parameters for Client.ListRules +type ListRulesOptions struct { + // MaxPageSize is the maximum size of each page of results. + MaxPageSize int32 +} + +// NewListRulesPager creates a pager that can list rules for a subscription. +func (ac *Client) NewListRulesPager(topicName string, subscriptionName string, options *ListRulesOptions) *runtime.Pager[ListRulesResponse] { + var pageSize int32 + + if options != nil { + pageSize = options.MaxPageSize + } + + ep := &entityPager[atom.RuleFeed, atom.RuleEnvelope, RuleProperties]{ + convertFn: ac.newRuleProperties, + baseFragment: fmt.Sprintf("/%s/Subscriptions/%s/Rules/", topicName, subscriptionName), + maxPageSize: pageSize, + em: ac.em, + } + + return runtime.NewPager(runtime.PageProcessor[ListRulesResponse]{ + More: func(ltr ListRulesResponse) bool { + return ep.More() + }, + Fetcher: func(ctx context.Context, t *ListRulesResponse) (ListRulesResponse, error) { + items, err := ep.Fetcher(ctx) + + if err != nil { + return ListRulesResponse{}, err + } + + return ListRulesResponse{ + Rules: items, + }, nil + }, + }) +} + +// UpdateRuleResponse contains the response fields for Client.UpdateRule +type UpdateRuleResponse struct { + // RuleProperties for the updated rule. + RuleProperties +} + +// UpdateRuleOptions can be used to configure the UpdateRule method. +type UpdateRuleOptions struct { + // For future expansion +} + +// UpdateRule updates a rule for a subscription. +func (ac *Client) UpdateRule(ctx context.Context, topicName string, subscriptionName string, properties RuleProperties) (UpdateRuleResponse, error) { + resp, _, err := ac.createOrUpdateRule(ctx, topicName, subscriptionName, properties, false) + + if err != nil { + return UpdateRuleResponse{}, err + } + + return UpdateRuleResponse{RuleProperties: *resp}, nil +} + +// DeleteRuleResponse contains the response fields for Client.DeleteRule +type DeleteRuleResponse struct { + // For future expansion +} + +// DeleteRuleOptions can be used to configure the Client.DeleteRule method. +type DeleteRuleOptions struct { + // For future expansion +} + +// DeleteRule deletes a rule for a subscription. +func (ac *Client) DeleteRule(ctx context.Context, topicName string, subscriptionName string, ruleName string, options *DeleteRuleOptions) (DeleteRuleResponse, error) { + _, err := ac.em.Delete(ctx, fmt.Sprintf("/%s/Subscriptions/%s/Rules/%s", topicName, subscriptionName, ruleName)) + + return DeleteRuleResponse{}, err +} + +func (ac *Client) createOrUpdateRule(ctx context.Context, topicName string, subscriptionName string, putProps RuleProperties, creating bool) (*RuleProperties, *http.Response, error) { + ruleDesc := atom.RuleDescription{} + + theirFilter := putProps.Filter + + if theirFilter != nil { + switch actualFilter := theirFilter.(type) { + case *FalseFilter: + ruleDesc.Filter = &atom.FilterDescription{ + Type: "FalseFilter", + SQLExpression: to.Ptr("1=0"), + } + case *TrueFilter: + ruleDesc.Filter = &atom.FilterDescription{ + Type: "TrueFilter", + SQLExpression: to.Ptr("1=1"), + } + case *SQLFilter: + params, err := publicSQLParametersToInternal(actualFilter.Parameters) + + if err != nil { + return nil, nil, err + } + + ruleDesc.Filter = &atom.FilterDescription{ + Type: "SqlFilter", + SQLExpression: &actualFilter.Expression, + Parameters: params, + } + case *CorrelationFilter: + appProps, err := publicSQLParametersToInternal(actualFilter.ApplicationProperties) + + if err != nil { + return nil, nil, err + } + + ruleDesc.Filter = &atom.FilterDescription{ + Type: "CorrelationFilter", + CorrelationFilter: atom.CorrelationFilter{ + ContentType: actualFilter.ContentType, + CorrelationID: actualFilter.CorrelationID, + MessageID: actualFilter.MessageID, + ReplyTo: actualFilter.ReplyTo, + ReplyToSessionID: actualFilter.ReplyToSessionID, + SessionID: actualFilter.SessionID, + Label: actualFilter.Subject, + To: actualFilter.To, + Properties: appProps, + }, + } + case *UnknownRuleFilter: + fd, err := convertUnknownRuleFilterToFilterDescription(actualFilter) + + if err != nil { + return nil, nil, err + } + + ruleDesc.Filter = fd + default: + return nil, nil, fmt.Errorf("invalid type ('%T') for Rule.Filter", theirFilter) + } + } else { + ruleDesc.Filter = &atom.FilterDescription{ + Type: "TrueFilter", + SQLExpression: to.Ptr("1=1"), + } + } + + theirAction := putProps.Action + + if theirAction != nil { + switch actualAction := theirAction.(type) { + case *SQLAction: + params, err := publicSQLParametersToInternal(actualAction.Parameters) + + if err != nil { + return nil, nil, err + } + + ruleDesc.Action = &atom.ActionDescription{ + Type: "SqlRuleAction", + SQLExpression: actualAction.Expression, + Parameters: params, + } + case *UnknownRuleAction: + ad, err := convertUnknownRuleActionToActionDescription(actualAction) + + if err != nil { + return nil, nil, err + } + + ruleDesc.Action = ad + default: + return nil, nil, fmt.Errorf("invalid type ('%T') for Rule.Action", theirAction) + } + } + + ruleDesc.Name = "$Default" + + if putProps.Name != "" { + ruleDesc.Name = putProps.Name + } + + var mw []atom.MiddlewareFunc + + if !creating { + // an update requires the entity to already exist. + mw = append(mw, func(next atom.RestHandler) atom.RestHandler { + return func(ctx context.Context, req *http.Request) (*http.Response, error) { + req.Header.Set("If-Match", "*") + return next(ctx, req) + } + }) + } + + putEnv := atom.WrapWithRuleEnvelope(&ruleDesc) + + var respEnv *atom.RuleEnvelope + + httpResp, err := ac.em.Put(ctx, fmt.Sprintf("/%s/Subscriptions/%s/Rules/%s", topicName, subscriptionName, putProps.Name), putEnv, &respEnv, mw...) + + if err != nil { + return nil, nil, err + } + + respProps, err := ac.newRuleProperties(respEnv) + + return respProps, httpResp, err +} + +func (ac *Client) newRuleProperties(env *atom.RuleEnvelope) (*RuleProperties, error) { + desc := env.Content.RuleDescription + + props := RuleProperties{ + Name: env.Title, + } + + switch desc.Filter.Type { + case "TrueFilter": + props.Filter = &TrueFilter{} + case "FalseFilter": + props.Filter = &FalseFilter{} + case "CorrelationFilter": + cf := desc.Filter.CorrelationFilter + + appProps, err := internalSQLParametersToPublic(cf.Properties) + + if err != nil { + return nil, err + } + + props.Filter = &CorrelationFilter{ + ContentType: cf.ContentType, + CorrelationID: cf.CorrelationID, + MessageID: cf.MessageID, + ReplyTo: cf.ReplyTo, + ReplyToSessionID: cf.ReplyToSessionID, + SessionID: cf.SessionID, + Subject: cf.Label, + To: cf.To, + ApplicationProperties: appProps, + } + case "SqlFilter": + params, err := internalSQLParametersToPublic(desc.Filter.Parameters) + + if err != nil { + return nil, err + } + + props.Filter = &SQLFilter{ + Expression: *desc.Filter.SQLExpression, + Parameters: params, + } + default: + urf, err := newUnknownRuleFilterFromFilterDescription(desc.Filter) + + if err != nil { + return nil, err + } + + props.Filter = urf + } + + const emptyRuleAction = "EmptyRuleAction" + + switch desc.Action.Type { + case emptyRuleAction: + case "SqlRuleAction": + params, err := internalSQLParametersToPublic(desc.Action.Parameters) + + if err != nil { + return nil, err + } + + props.Action = &SQLAction{ + Expression: desc.Action.SQLExpression, + Parameters: params, + } + default: + ura, err := newUnknownRuleActionFromActionDescription(desc.Action) + + if err != nil { + return nil, err + } + + props.Action = ura + } + + return &props, nil +} + +func publicSQLParametersToInternal(publicParams map[string]interface{}) (*atom.KeyValueList, error) { + if len(publicParams) == 0 { + return nil, nil + } + + var params []atom.KeyValueOfstringanyType + + for k, v := range publicParams { + switch asType := v.(type) { + case string: + params = append(params, atom.KeyValueOfstringanyType{ + Key: k, + Value: atom.Value{ + Type: "l28:string", + L28NS: "http://www.w3.org/2001/XMLSchema", + Text: asType, + }, + }) + case bool: + params = append(params, atom.KeyValueOfstringanyType{ + Key: k, + Value: atom.Value{ + Type: "l28:boolean", + L28NS: "http://www.w3.org/2001/XMLSchema", + Text: fmt.Sprintf("%t", v), + }, + }) + case int, int64, int32: + params = append(params, atom.KeyValueOfstringanyType{ + Key: k, + Value: atom.Value{ + Type: "l28:int", + L28NS: "http://www.w3.org/2001/XMLSchema", + Text: fmt.Sprintf("%d", v), + }, + }) + case float32, float64: + params = append(params, atom.KeyValueOfstringanyType{ + Key: k, + Value: atom.Value{ + Type: "l28:double", + L28NS: "http://www.w3.org/2001/XMLSchema", + Text: fmt.Sprintf("%f", v), + }, + }) + case time.Time: + params = append(params, atom.KeyValueOfstringanyType{ + Key: k, + Value: atom.Value{ + Type: "l28:dateTime", + L28NS: "http://www.w3.org/2001/XMLSchema", + Text: asType.UTC().Format(time.RFC3339Nano), + }, + }) + default: + // TODO: 'duration' + return nil, fmt.Errorf("type %T of parameter %s is not a handled type for SQL parameters", v, k) + } + } + + return &atom.KeyValueList{KeyValues: params}, nil +} + +func internalSQLParametersToPublic(kvlist *atom.KeyValueList) (map[string]interface{}, error) { + if kvlist == nil { + return nil, nil + } + + params := map[string]interface{}{} + + for _, p := range kvlist.KeyValues { + switch p.Value.Type { + case "d6p1:string": + params[p.Key] = p.Value.Text + case "d6p1:boolean": + val, err := strconv.ParseBool(p.Value.Text) + + if err != nil { + return nil, err + } + + params[p.Key] = val + case "d6p1:int": + val, err := strconv.ParseInt(p.Value.Text, 10, 64) + + if err != nil { + return nil, err + } + + params[p.Key] = val + case "d6p1:double": + val, err := strconv.ParseFloat(p.Value.Text, 64) + + if err != nil { + return nil, err + } + + params[p.Key] = val + case "d6p1:dateTime": + val, err := time.Parse(time.RFC3339Nano, p.Value.Text) + + if err != nil { + return nil, err + } + + params[p.Key] = val.UTC() + default: + // TODO: timespan + return nil, fmt.Errorf("type %s of parameter %s is not a handled type for SQL parameters", p.Value.Type, p.Key) + } + } + + if len(params) == 0 { + return nil, nil + } + + return params, nil +} + +func newUnknownRuleFilterFromFilterDescription(fd *atom.FilterDescription) (*UnknownRuleFilter, error) { + attrs := fd.RawAttrs + + // 'type' gets parsed since it's one of the standard fields. Since we want to present + // the full filter XML we'll re-add it. + attrs = append(attrs, xml.Attr{ + Name: xml.Name{ + Local: "i:type", + }, Value: fd.Type, + }) + + userFacingXML := struct { + XMLName xml.Name `xml:"Filter"` + Attrs []xml.Attr `xml:",any,attr"` + XML []byte `xml:",innerxml"` + }{ + Attrs: attrs, + XML: fd.RawXML, + } + + xmlBytes, err := xml.Marshal(userFacingXML) + + if err != nil { + return nil, err + } + + return &UnknownRuleFilter{ + Type: fd.Type, + RawXML: xmlBytes, + }, nil +} + +func convertUnknownRuleFilterToFilterDescription(urf *UnknownRuleFilter) (*atom.FilterDescription, error) { + var fdXML struct { + Type string `xml:"i type,attr"` + Attrs []xml.Attr `xml:",any,attr"` + XML []byte `xml:",innerxml"` + } + + if err := xml.Unmarshal([]byte(urf.RawXML), &fdXML); err != nil { + return nil, err + } + + return &atom.FilterDescription{ + Type: fdXML.Type, + RawAttrs: fdXML.Attrs, + RawXML: fdXML.XML, + }, nil +} + +func newUnknownRuleActionFromActionDescription(ad *atom.ActionDescription) (*UnknownRuleAction, error) { + attrs := ad.RawAttrs + + // 'type' gets parsed since it's one of the standard fields. Since we want to present + // the full filter XML we'll re-add it. + attrs = append(attrs, xml.Attr{ + Name: xml.Name{ + Local: "i:type", + }, Value: ad.Type, + }) + + userFacingXML := struct { + XMLName xml.Name `xml:"Action"` + Attrs []xml.Attr `xml:",any,attr"` + XML []byte `xml:",innerxml"` + }{ + Attrs: attrs, + XML: ad.RawXML, + } + + xmlBytes, err := xml.Marshal(userFacingXML) + + if err != nil { + return nil, err + } + + return &UnknownRuleAction{ + Type: ad.Type, + RawXML: xmlBytes, + }, nil +} + +// convertUnknownRuleActionToActionDescription creates an atom.ActionDescription. +// This XML was originally +func convertUnknownRuleActionToActionDescription(urf *UnknownRuleAction) (*atom.ActionDescription, error) { + var adXML struct { + Type string `xml:"i type,attr"` + Attrs []xml.Attr `xml:",any,attr"` + XML []byte `xml:",innerxml"` + } + + if err := xml.Unmarshal([]byte(urf.RawXML), &adXML); err != nil { + return nil, err + } + + return &atom.ActionDescription{ + Type: adXML.Type, + RawXML: adXML.XML, + RawAttrs: adXML.Attrs, + }, nil +} diff --git a/sdk/messaging/azservicebus/admin/admin_client_subscription.go b/sdk/messaging/azservicebus/admin/admin_client_subscription.go index e5eb48a1bc62..6973d5698903 100644 --- a/sdk/messaging/azservicebus/admin/admin_client_subscription.go +++ b/sdk/messaging/azservicebus/admin/admin_client_subscription.go @@ -5,12 +5,10 @@ package admin import ( "context" - "errors" "fmt" "net/http" "time" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom" "github.com/Azure/azure-sdk-for-go/sdk/messaging/internal/auth" @@ -140,17 +138,7 @@ func (ac *Client) GetSubscription(ctx context.Context, topicName string, subscri _, err := ac.em.Get(ctx, fmt.Sprintf("/%s/Subscriptions/%s", topicName, subscriptionName), &atomResp) if err != nil { - if errors.Is(err, atom.ErrFeedEmpty) { - return nil, nil - } - - var respError *azcore.ResponseError - - if errors.As(err, &respError) && respError.StatusCode == http.StatusNotFound { - return nil, nil - } - - return nil, err + return mapATOMError[GetSubscriptionResponse](err) } item, err := newSubscriptionItem(atomResp, topicName) @@ -181,17 +169,7 @@ func (ac *Client) GetSubscriptionRuntimeProperties(ctx context.Context, topicNam _, err := ac.em.Get(ctx, fmt.Sprintf("/%s/Subscriptions/%s", topicName, subscriptionName), &atomResp) if err != nil { - if errors.Is(err, atom.ErrFeedEmpty) { - return nil, nil - } - - var respError *azcore.ResponseError - - if errors.As(err, &respError) && respError.StatusCode == http.StatusNotFound { - return nil, nil - } - - return nil, err + return mapATOMError[GetSubscriptionRuntimePropertiesResponse](err) } item, err := newSubscriptionRuntimePropertiesItem(atomResp, topicName) @@ -225,8 +203,8 @@ type ListSubscriptionsResponse struct { Subscriptions []SubscriptionPropertiesItem } -// ListSubscriptions lists subscriptions for a topic. -func (ac *Client) ListSubscriptions(topicName string, options *ListSubscriptionsOptions) *runtime.Pager[ListSubscriptionsResponse] { +// NewListSubscriptionsPager creates a pager than can list subscriptions for a topic. +func (ac *Client) NewListSubscriptionsPager(topicName string, options *ListSubscriptionsOptions) *runtime.Pager[ListSubscriptionsResponse] { var pageSize int32 if options != nil { @@ -280,8 +258,8 @@ type ListSubscriptionsRuntimePropertiesResponse struct { SubscriptionRuntimeProperties []SubscriptionRuntimePropertiesItem } -// ListSubscriptionsRuntimeProperties lists runtime properties for subscriptions for a topic. -func (ac *Client) ListSubscriptionsRuntimeProperties(topicName string, options *ListSubscriptionsRuntimePropertiesOptions) *runtime.Pager[ListSubscriptionsRuntimePropertiesResponse] { +// NewListSubscriptionsRuntimePropertiesPager creates a pager than can list runtime properties for subscriptions for a topic. +func (ac *Client) NewListSubscriptionsRuntimePropertiesPager(topicName string, options *ListSubscriptionsRuntimePropertiesOptions) *runtime.Pager[ListSubscriptionsRuntimePropertiesResponse] { var pageSize int32 if options != nil { diff --git a/sdk/messaging/azservicebus/admin/admin_client_test.go b/sdk/messaging/azservicebus/admin/admin_client_test.go index 552066cfad09..f6bed4ed70a1 100644 --- a/sdk/messaging/azservicebus/admin/admin_client_test.go +++ b/sdk/messaging/azservicebus/admin/admin_client_test.go @@ -4,10 +4,15 @@ package admin import ( + "bytes" "context" + cryptoRand "crypto/rand" + "encoding/hex" + "encoding/xml" "fmt" "net/http" "os" + "sort" "strings" "sync" "testing" @@ -19,6 +24,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/atom" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/internal/auth" "github.com/stretchr/testify/require" ) @@ -75,6 +81,8 @@ func TestAdminClient_QueueWithMaxValues(t *testing.T) { es := EntityStatusReceiveDisabled + authRules := createAuthorizationRulesForTest(t) + queueName := fmt.Sprintf("queue-%X", time.Now().UnixNano()) _, err = adminClient.CreateQueue(context.Background(), queueName, &CreateQueueOptions{ @@ -94,6 +102,7 @@ func TestAdminClient_QueueWithMaxValues(t *testing.T) { Status: &es, AutoDeleteOnIdle: MaxTimeSpanForTests, UserMetadata: to.Ptr("some metadata"), + AuthorizationRules: authRules, }, }) require.NoError(t, err) @@ -118,6 +127,7 @@ func TestAdminClient_QueueWithMaxValues(t *testing.T) { Status: &es, AutoDeleteOnIdle: MaxTimeSpanForTests, UserMetadata: to.Ptr("some metadata"), + AuthorizationRules: authRules, }, resp.QueueProperties) runtimeResp, err := adminClient.GetQueueRuntimeProperties(context.Background(), queueName, nil) @@ -199,10 +209,13 @@ func TestAdminClient_UpdateQueue(t *testing.T) { }() createdProps.MaxDeliveryCount = to.Ptr(int32(101)) + createdProps.QueueProperties.AuthorizationRules = createAuthorizationRulesForTest(t) + updatedProps, err := adminClient.UpdateQueue(context.Background(), queueName, createdProps.QueueProperties, nil) require.NoError(t, err) require.EqualValues(t, 101, *updatedProps.MaxDeliveryCount) + require.EqualValues(t, createdProps.QueueProperties.AuthorizationRules, updatedProps.AuthorizationRules) // try changing a value that's not allowed updatedProps.RequiresSession = to.Ptr(true) @@ -240,7 +253,7 @@ func TestAdminClient_ListQueues(t *testing.T) { } // we skipped the first queue so it shouldn't come back in the results. - pager := adminClient.ListQueues(nil) + pager := adminClient.NewListQueuesPager(nil) all := map[string]QueueItem{} for pager.More() { @@ -282,7 +295,7 @@ func TestAdminClient_ListQueuesRuntimeProperties(t *testing.T) { } // we skipped the first queue so it shouldn't come back in the results. - pager := adminClient.ListQueuesRuntimeProperties(&ListQueuesRuntimePropertiesOptions{ + pager := adminClient.NewListQueuesRuntimePropertiesPager(&ListQueuesRuntimePropertiesOptions{ MaxPageSize: 2, }) all := map[string]QueueRuntimePropertiesItem{} @@ -478,10 +491,17 @@ func TestAdminClient_UpdateTopic(t *testing.T) { defer deleteTopic(t, adminClient, topicName) addResp.AutoDeleteOnIdle = to.Ptr("PT11M") + addResp.AuthorizationRules = createAuthorizationRulesForTest(t) + updateResp, err := adminClient.UpdateTopic(context.Background(), topicName, addResp.TopicProperties, nil) require.NoError(t, err) require.EqualValues(t, "PT11M", *updateResp.AutoDeleteOnIdle) + require.EqualValues(t, addResp.AuthorizationRules, updateResp.AuthorizationRules) + + getResp, err := adminClient.GetTopic(context.Background(), topicName, nil) + require.NoError(t, err) + require.Equal(t, getResp.TopicProperties, updateResp.TopicProperties) // try changing a value that's not allowed updateResp.EnablePartitioning = to.Ptr(true) @@ -527,7 +547,7 @@ func TestAdminClient_ListTopics(t *testing.T) { wg.Wait() // we skipped the first topic so it shouldn't come back in the results. - pager := adminClient.ListTopics(&ListTopicsOptions{ + pager := adminClient.NewListTopicsPager(&ListTopicsOptions{ MaxPageSize: 2, }) all := map[string]TopicItem{} @@ -580,7 +600,7 @@ func TestAdminClient_ListTopicsRuntimeProperties(t *testing.T) { times := 0 // we skipped the first topic so it shouldn't come back in the results. - pager := adminClient.ListTopicsRuntimeProperties(&ListTopicsRuntimePropertiesOptions{ + pager := adminClient.NewListTopicsRuntimePropertiesPager(&ListTopicsRuntimePropertiesOptions{ MaxPageSize: 2, }) all := map[string]TopicRuntimePropertiesItem{} @@ -640,7 +660,7 @@ func TestAdminClient_ListSubscriptions(t *testing.T) { } // we skipped the first topic so it shouldn't come back in the results. - pager := adminClient.ListSubscriptions(topicName, &ListSubscriptionsOptions{ + pager := adminClient.NewListSubscriptionsPager(topicName, &ListSubscriptionsOptions{ MaxPageSize: 2, }) all := map[string]SubscriptionPropertiesItem{} @@ -696,7 +716,7 @@ func TestAdminClient_ListSubscriptionRuntimeProperties(t *testing.T) { } // we skipped the first subscription so it shouldn't come back in the results. - pager := adminClient.ListSubscriptionsRuntimeProperties(topicName, &ListSubscriptionsRuntimePropertiesOptions{ + pager := adminClient.NewListSubscriptionsRuntimePropertiesPager(topicName, &ListSubscriptionsRuntimePropertiesOptions{ MaxPageSize: 2, }) all := map[string]SubscriptionRuntimePropertiesItem{} @@ -790,7 +810,7 @@ func TestAdminClient_LackPermissions_Queue(t *testing.T) { require.ErrorAs(t, err, &re) require.EqualValues(t, 401, re.StatusCode) - pager := testData.Client.ListQueues(nil) + pager := testData.Client.NewListQueuesPager(nil) page, err := pager.NextPage(context.Background()) require.Empty(t, page.Queues) require.Contains(t, err.Error(), "Manage,EntityRead claims required for this operation") @@ -834,7 +854,7 @@ func TestAdminClient_LackPermissions_Topic(t *testing.T) { require.ErrorAs(t, err, &asResponseErr) require.EqualValues(t, 401, asResponseErr.StatusCode) - pager := testData.Client.ListTopics(nil) + pager := testData.Client.NewListTopicsPager(nil) _, err = pager.NextPage(context.Background()) require.Contains(t, err.Error(), ">Manage,EntityRead claims required for this operation") require.ErrorAs(t, err, &asResponseErr) @@ -915,7 +935,7 @@ func TestAdminClient_LackPermissions_Subscription(t *testing.T) { _, err = testData.Client.GetSubscription(ctx, testData.TopicName, testData.SubName, nil) require.Contains(t, err.Error(), "401 SubCode=40100: Unauthorized : Unauthorized access for 'GetSubscription'") - pager := testData.Client.ListSubscriptions(testData.TopicName, nil) + pager := testData.Client.NewListSubscriptionsPager(testData.TopicName, nil) _, err = pager.NextPage(context.Background()) require.Contains(t, err.Error(), "401 SubCode=40100: Unauthorized : Unauthorized access for 'EnumerateSubscriptions' operation") @@ -929,6 +949,455 @@ func TestAdminClient_LackPermissions_Subscription(t *testing.T) { require.Contains(t, err.Error(), "401 SubCode=40100: Unauthorized : Unauthorized access for 'DeleteSubscription'") } +func TestAdminClient_CreateRules(t *testing.T) { + adminClient, topicName := createTestSub(t) + defer func() { + _, err := adminClient.DeleteTopic(context.Background(), topicName, nil) + require.NoError(t, err) + }() + + t.Run("ruleThatDoesNotExist", func(t *testing.T) { + getResp, err := adminClient.GetRule(context.Background(), topicName, "sub", "non-existent-rule", nil) + require.NoError(t, err) + require.Nil(t, getResp) + }) + + // (simple all in one - create the rule, update the rule, get the rule and make sure it + // all does the right stuff. + assertRuleCRUD := func(t *testing.T, rp RuleProperties) { + defer func() { + resp, err := adminClient.DeleteRule(context.Background(), topicName, "sub", rp.Name, nil) + require.NoError(t, err) + require.NotNil(t, resp) + }() + + createdRule, err := adminClient.CreateRule(context.Background(), topicName, "sub", &CreateRuleOptions{ + Name: &rp.Name, + Filter: rp.Filter, + Action: rp.Action, + }) + require.NoError(t, err, fmt.Sprintf("Created rule %s", rp.Name)) + + if rp.Filter == nil { + // Service Bus will automatically add in a 'TrueFilter' to our + // rule. We'll add it to our local copy just for assert purposes. + rp.Filter = &TrueFilter{} + } + + require.Equal(t, createdRule, CreateRuleResponse{ + RuleProperties: rp, + }, fmt.Sprintf("Created rule %s matches our rule", rp.Name)) + + updateResp, err := adminClient.UpdateRule(context.Background(), topicName, "sub", createdRule.RuleProperties) + require.NoError(t, err, fmt.Sprintf("Updated rule %s succeeds", rp.Name)) + + require.Equal(t, updateResp, UpdateRuleResponse{ + RuleProperties: rp, + }, fmt.Sprintf("Updated rule %s matches our rule", rp.Name)) + + getResp, err := adminClient.GetRule(context.Background(), topicName, "sub", rp.Name, nil) + require.NoError(t, err, fmt.Sprintf("Get rule %s succeeds", rp.Name)) + + require.Equal(t, getResp, &GetRuleResponse{ + RuleProperties: updateResp.RuleProperties, + }, fmt.Sprintf("Get rule %s matches our rule", rp.Name)) + } + + t.Run("ruleWithNoActionOrFilter", func(t *testing.T) { + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithNoActionOrFilter", + }) + }) + + t.Run("ruleWithFalseFilter", func(t *testing.T) { + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithFalseFilter", + Filter: &FalseFilter{}, + }) + }) + + t.Run("ruleWithTrueFilter", func(t *testing.T) { + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithTrueFilter", + Filter: &FalseFilter{}, + }) + }) + + t.Run("ruleWithSQLFilterNoParams", func(t *testing.T) { + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithSQLFilterNoParams", + Filter: &SQLFilter{ + Expression: "MessageID='hello'", + }, + }) + }) + + t.Run("ruleWithSQLFilterWithParams", func(t *testing.T) { + dt, err := time.Parse(time.RFC3339, "2001-01-01T01:02:03Z") + require.NoError(t, err) + + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithSQLFilterWithParams", + Filter: &SQLFilter{ + Expression: "MessageID=@stringVar OR MessageID=@intVar OR MessageID=@floatVar OR MessageID=@dateTimeVar OR MessageID=@boolVar", + Parameters: map[string]interface{}{ + "@stringVar": "hello world", + "@intVar": int64(100), + "@floatVar": float64(100.1), + "@dateTimeVar": dt, + "@boolVar": true, + }, + }, + }) + }) + + t.Run("ruleWithCorrelationFilter", func(t *testing.T) { + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithCorrelationFilter", + Filter: &CorrelationFilter{ + ContentType: to.Ptr("application/xml"), + CorrelationID: to.Ptr("correlationID"), + MessageID: to.Ptr("messageID"), + ReplyTo: to.Ptr("replyTo"), + ReplyToSessionID: to.Ptr("replyToSessionID"), + SessionID: to.Ptr("sessionID"), + Subject: to.Ptr("subject"), + To: to.Ptr("to"), + ApplicationProperties: map[string]interface{}{ + "CustomProp1": "hello", + }, + }, + }) + }) + + t.Run("ruleWithAction", func(t *testing.T) { + dt, err := time.Parse(time.RFC3339, "2001-01-01T01:02:03Z") + require.NoError(t, err) + + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithAction", + Action: &SQLAction{ + Expression: "SET MessageID=@stringVar SET MessageID=@intVar SET MessageID=@floatVar SET MessageID=@dateTimeVar SET MessageID=@boolVar", + Parameters: map[string]interface{}{ + "@stringVar": "hello world", + "@intVar": int64(100), + "@floatVar": float64(100.1), + "@dateTimeVar": dt, + "@boolVar": true, + }, + }, + }) + }) + + t.Run("ruleWithFilterAndAction", func(t *testing.T) { + dt, err := time.Parse(time.RFC3339, "2001-01-01T01:02:03Z") + require.NoError(t, err) + + assertRuleCRUD(t, RuleProperties{ + Name: "ruleWithFilterAndAction", + Filter: &SQLFilter{ + Expression: "MessageID=@stringVar OR MessageID=@intVar OR MessageID=@floatVar OR MessageID=@dateTimeVar OR MessageID=@boolVar", + Parameters: map[string]interface{}{ + "@stringVar": "hello world", + "@intVar": int64(100), + "@floatVar": float64(100.1), + "@dateTimeVar": dt, + "@boolVar": true, + }, + }, + Action: &SQLAction{ + Expression: "SET MessageID=@stringVar SET MessageID=@intVar SET MessageID=@floatVar SET MessageID=@dateTimeVar SET MessageID=@boolVar", + Parameters: map[string]interface{}{ + "@stringVar": "hello world", + "@intVar": int64(100), + "@floatVar": float64(100.1), + "@dateTimeVar": dt, + "@boolVar": true, + }, + }, + }) + }) +} + +func TestAdminClient_ListRulesWithOnlyDefault(t *testing.T) { + adminClient, topicName := createTestSub(t) + defer func() { + _, err := adminClient.DeleteTopic(context.Background(), topicName, nil) + require.NoError(t, err) + }() + + rulesPager := adminClient.NewListRulesPager(topicName, "sub", nil) + require.True(t, rulesPager.More()) + resp, err := rulesPager.NextPage(context.Background()) + require.NoError(t, err) + + require.Equal(t, []RuleProperties{ + {Name: "$Default", Filter: &TrueFilter{}}, + }, resp.Rules) + + // documenting this behavior - we let the service dictate the + // default page size so we don't know (yet) if there are any more results + // remaining. + require.True(t, rulesPager.More()) + + resp, err = rulesPager.NextPage(context.Background()) + require.NoError(t, err) + require.Empty(t, resp) + + require.False(t, rulesPager.More()) +} + +func TestAdminClient_ListRules_MaxPageSize(t *testing.T) { + adminClient, topicName := createTestSub(t) + defer func() { + _, err := adminClient.DeleteTopic(context.Background(), topicName, nil) + require.NoError(t, err) + }() + + for _, rule := range []string{"rule1", "rule2", "rule3"} { + _, err := adminClient.CreateRule(context.Background(), topicName, "sub", &CreateRuleOptions{ + Name: to.Ptr(rule), + Filter: &SQLFilter{ + Expression: fmt.Sprintf("MessageID=%s", rule), + }, + }) + require.NoError(t, err) + + defer func(rule string) { + _, err := adminClient.DeleteRule(context.Background(), topicName, "sub", rule, nil) + require.NoError(t, err) + }(rule) + } + + rulesPager := adminClient.NewListRulesPager(topicName, "sub", &ListRulesOptions{ + // there are actually 4 rules on the subscription right now - the 3 I just added + // _and_ the $Default rule, which was auto-generated when the subscription + // was created. + MaxPageSize: 3, + }) + + var all []RuleProperties + + // first page + require.True(t, rulesPager.More()) + resp, err := rulesPager.NextPage(context.Background()) + require.NoError(t, err) + require.Equal(t, 3, len(resp.Rules)) + + all = append(all, resp.Rules...) + + // second page + require.True(t, rulesPager.More()) + resp, err = rulesPager.NextPage(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, len(resp.Rules)) + + // since we explicitly configured a page size we know that this one came back + // light, so we know this is also the last page. + require.False(t, rulesPager.More()) + + all = append(all, resp.Rules...) + + sort.Slice(all, func(i, j int) bool { + return strings.Compare(all[i].Name, all[j].Name) < 0 + }) + + require.Equal(t, []RuleProperties{ + { + Name: "$Default", + Filter: &TrueFilter{}, + }, + { + Name: "rule1", + Filter: &SQLFilter{ + Expression: "MessageID=rule1", + }, + }, + { + Name: "rule2", + Filter: &SQLFilter{ + Expression: "MessageID=rule2", + }, + }, + { + Name: "rule3", + Filter: &SQLFilter{ + Expression: "MessageID=rule3", + }, + }, + }, all) +} + +func TestAdminClient_GetDefaultRule(t *testing.T) { + adminClient, topicName := createTestSub(t) + defer func() { + _, err := adminClient.DeleteTopic(context.Background(), topicName, nil) + require.NoError(t, err) + }() + + getResp, err := adminClient.GetRule(context.Background(), topicName, "sub", "$Default", nil) + require.NoError(t, err) + + // by default a subscription has a filter that lets every + // message through (ie, the TrueFilter) + require.Equal(t, getResp, &GetRuleResponse{ + RuleProperties: RuleProperties{ + Name: "$Default", + Filter: &TrueFilter{}, + }, + }) + + // switch to a filter that _rejects_ every message instead + getResp.RuleProperties.Filter = &FalseFilter{} + + updateRuleResp, err := adminClient.UpdateRule(context.Background(), topicName, "sub", getResp.RuleProperties) + require.NoError(t, err) + + require.Equal(t, updateRuleResp.RuleProperties, getResp.RuleProperties) +} + +type emwrap struct { + inner atom.EntityManager +} + +func (em *emwrap) Put(ctx context.Context, entityPath string, body interface{}, respObj interface{}, mw ...atom.MiddlewareFunc) (*http.Response, error) { + resp, err := em.inner.Put(ctx, entityPath, body, respObj, mw...) + + if err != nil { + return resp, err + } + + em.makeFilterAndActionUnknown(respObj) + return resp, err +} + +func (em *emwrap) Delete(ctx context.Context, entityPath string, mw ...atom.MiddlewareFunc) (*http.Response, error) { + return em.inner.Delete(ctx, entityPath, mw...) +} +func (em *emwrap) TokenProvider() auth.TokenProvider { return em.inner.TokenProvider() } + +func (em *emwrap) Get(ctx context.Context, entityPath string, respObj interface{}, mw ...atom.MiddlewareFunc) (*http.Response, error) { + resp, err := em.inner.Get(ctx, entityPath, respObj, mw...) + + if err != nil { + return resp, err + } + + em.makeFilterAndActionUnknown(respObj) + return resp, err +} + +func (*emwrap) makeFilterAndActionUnknown(respObj interface{}) { + switch actual := respObj.(type) { + case **atom.RuleEnvelope: + f := (*actual).Content.RuleDescription.Filter + f.Type = "PurposefullyChangedFilterType_" + f.Type + + a := (*actual).Content.RuleDescription.Action + a.Type = "PurposefullyChangedActionType_" + a.Type + } +} + +func TestAdminClient_UnknownFilterRoundtrippingWorks(t *testing.T) { + // NOTE: This test is a little weird - we basically override all "known" type handling for filters and + // actions and force them to go through our "unknown" filter handling. + // + // This allows the service to potentially upgrade in the future without breaking older clients. They get a + // relatively primitive object but they won't accidentally delete or slice filters when doing updates. + // + // Also, if they're willing to deserialize the XML themselves they can interact with filters until they + // update their azservicebus dependency. + + adminClient, topicName := createTestSub(t) + defer func() { + _, err := adminClient.DeleteTopic(context.Background(), topicName, nil) + require.NoError(t, err) + }() + + dt, err := time.Parse(time.RFC3339, "2001-01-01T01:02:03Z") + require.NoError(t, err) + + rp := RuleProperties{ + Name: "ruleWithFilterAndAction", + Filter: &SQLFilter{ + Expression: "MessageID=@stringVar OR MessageID=@intVar OR MessageID=@floatVar OR MessageID=@dateTimeVar OR MessageID=@boolVar", + Parameters: map[string]interface{}{ + "@stringVar": "hello world", + "@intVar": int64(100), + "@floatVar": float64(100.1), + "@dateTimeVar": dt, + "@boolVar": true, + }, + }, + Action: &SQLAction{ + Expression: "SET MessageID=@stringVar SET MessageID=@intVar SET MessageID=@floatVar SET MessageID=@dateTimeVar SET MessageID=@boolVar", + Parameters: map[string]interface{}{ + "@stringVar": "hello world", + "@intVar": int64(100), + "@floatVar": float64(100.1), + "@dateTimeVar": dt, + "@boolVar": true, + }, + }, + } + + origEM := adminClient.em + + adminClient.em = &emwrap{ + inner: origEM, + } + + createdRule, err := adminClient.CreateRule(context.Background(), topicName, "sub", &CreateRuleOptions{ + Name: &rp.Name, + Filter: rp.Filter, + Action: rp.Action, + }) + require.NoError(t, err, fmt.Sprintf("Created rule %s", rp.Name)) + + urf := createdRule.Filter.(*UnknownRuleFilter) + require.Regexp(t, "^"), + RawAttrs: []xml.Attr{ + { + Name: xml.Name{ + Local: "some-custom-attribute", + }, + Value: "some-custom-attribute-value", + }, + }, + }) + require.NoError(t, err) + require.Equal(t, + ``+ + ``+ + ``, string(ura.RawXML)) + require.Equal(t, "SomeNewAction", ura.Type) + + // and now the inverse + ad, err := convertUnknownRuleActionToActionDescription(ura) + require.NoError(t, err) + + require.Equal(t, "", string(ad.RawXML)) + + require.Equal(t, []xml.Attr{ + {Name: xml.Name{Local: "some-custom-attribute"}, Value: "some-custom-attribute-value"}}, ad.RawAttrs) + require.Equal(t, "SomeNewAction", string(ad.Type)) + + _, err = convertUnknownRuleActionToActionDescription(&UnknownRuleAction{ + Type: "something", + RawXML: []byte("invalid &xml"), + }) + require.Error(t, err) +} + +func TestAdminClient_unknownFilterSerde(t *testing.T) { + urf, err := newUnknownRuleFilterFromFilterDescription(&atom.FilterDescription{ + Type: "SomeNewFilter", + RawXML: []byte(""), + RawAttrs: []xml.Attr{ + { + Name: xml.Name{ + Local: "some-custom-attribute", + }, + Value: "some-custom-attribute-value", + }, + }, + }) + require.NoError(t, err) + require.Equal(t, + ``+ + ``+ + ``, string(urf.RawXML)) + require.Equal(t, "SomeNewFilter", urf.Type) + + // and now the inverse + ad, err := convertUnknownRuleFilterToFilterDescription(urf) + require.NoError(t, err) + + require.Equal(t, "", string(ad.RawXML)) + + require.Equal(t, []xml.Attr{ + {Name: xml.Name{Local: "some-custom-attribute"}, Value: "some-custom-attribute-value"}}, ad.RawAttrs) + require.Equal(t, "SomeNewFilter", string(ad.Type)) + + _, err = convertUnknownRuleFilterToFilterDescription(&UnknownRuleFilter{ + Type: "something", + RawXML: []byte("invalid &xml"), + }) + require.Error(t, err) +} + func deleteQueue(t *testing.T, ac *Client, queueName string) { _, err := ac.DeleteQueue(context.Background(), queueName, nil) require.NoError(t, err) @@ -1140,3 +1683,26 @@ func setupLowPrivTest(t *testing.T) *struct { Cleanup: cleanup, } } + +func createRandomKeyForSB(t *testing.T) string { + tempPassword := make([]byte, 22) + n, err := cryptoRand.Read(tempPassword) + require.NoError(t, err) + require.Equal(t, cap(tempPassword), n) + + return hex.EncodeToString(tempPassword) +} + +func createAuthorizationRulesForTest(t *testing.T) []AuthorizationRule { + primary := createRandomKeyForSB(t) + secondary := createRandomKeyForSB(t) + + return []AuthorizationRule{ + { + AccessRights: []AccessRight{AccessRightSend}, + KeyName: to.Ptr("keyName1"), + PrimaryKey: &primary, + SecondaryKey: &secondary, + }, + } +} diff --git a/sdk/messaging/azservicebus/admin/admin_client_topic.go b/sdk/messaging/azservicebus/admin/admin_client_topic.go index ceb0b68b722d..ac35bded4934 100644 --- a/sdk/messaging/azservicebus/admin/admin_client_topic.go +++ b/sdk/messaging/azservicebus/admin/admin_client_topic.go @@ -5,7 +5,6 @@ package admin import ( "context" - "errors" "net/http" "time" @@ -51,6 +50,9 @@ type TopicProperties struct { // UserMetadata is custom metadata that user can associate with the topic. UserMetadata *string + + // AuthorizationRules are the authorization rules for this entity. + AuthorizationRules []AuthorizationRule } // TopicRuntimeProperties represent dynamic properties of a topic, such as the ActiveMessageCount. @@ -121,11 +123,7 @@ func (ac *Client) GetTopic(ctx context.Context, topicName string, options *GetTo _, err := ac.em.Get(ctx, "/"+topicName, &atomResp) if err != nil { - if errors.Is(err, atom.ErrFeedEmpty) { - return nil, nil - } - - return nil, err + return mapATOMError[GetTopicResponse](err) } topicItem, err := newTopicItem(atomResp) @@ -157,11 +155,7 @@ func (ac *Client) GetTopicRuntimeProperties(ctx context.Context, topicName strin _, err := ac.em.Get(ctx, "/"+topicName, &atomResp) if err != nil { - if errors.Is(err, atom.ErrFeedEmpty) { - return nil, nil - } - - return nil, err + return mapATOMError[GetTopicRuntimePropertiesResponse](err) } item, err := newTopicRuntimePropertiesItem(atomResp) @@ -194,8 +188,8 @@ type ListTopicsOptions struct { MaxPageSize int32 } -// ListTopics lists topics. -func (ac *Client) ListTopics(options *ListTopicsOptions) *runtime.Pager[ListTopicsResponse] { +// NewListTopicsPager creates a pager that can list topics. +func (ac *Client) NewListTopicsPager(options *ListTopicsOptions) *runtime.Pager[ListTopicsResponse] { var pageSize int32 if options != nil { @@ -246,8 +240,8 @@ type ListTopicsRuntimePropertiesOptions struct { MaxPageSize int32 } -// ListTopicsRuntimeProperties lists runtime properties for topics. -func (ac *Client) ListTopicsRuntimeProperties(options *ListTopicsRuntimePropertiesOptions) *runtime.Pager[ListTopicsRuntimePropertiesResponse] { +// NewListTopicsRuntimePropertiesPager creates a pager than can list runtime properties for topics. +func (ac *Client) NewListTopicsRuntimePropertiesPager(options *ListTopicsRuntimePropertiesOptions) *runtime.Pager[ListTopicsRuntimePropertiesResponse] { var pageSize int32 if options != nil { @@ -368,6 +362,7 @@ func newTopicEnvelope(props *TopicProperties, tokenProvider auth.TokenProvider) SupportOrdering: props.SupportOrdering, AutoDeleteOnIdle: props.AutoDeleteOnIdle, EnablePartitioning: props.EnablePartitioning, + AuthorizationRules: publicAccessRightsToInternal(props.AuthorizationRules), } return atom.WrapWithTopicEnvelope(desc) @@ -389,6 +384,7 @@ func newTopicItem(te *atom.TopicEnvelope) (*TopicItem, error) { AutoDeleteOnIdle: td.AutoDeleteOnIdle, EnablePartitioning: td.EnablePartitioning, SupportOrdering: td.SupportOrdering, + AuthorizationRules: internalAccessRightsToPublic(td.AuthorizationRules), }, }, nil } diff --git a/sdk/messaging/azservicebus/admin/example_admin_client_test.go b/sdk/messaging/azservicebus/admin/example_admin_client_test.go index 68bd41132fb6..5b95622cadec 100644 --- a/sdk/messaging/azservicebus/admin/example_admin_client_test.go +++ b/sdk/messaging/azservicebus/admin/example_admin_client_test.go @@ -57,8 +57,8 @@ func ExampleClient_CreateQueue_usingproperties() { fmt.Printf("Lock duration: %s\n", *resp.LockDuration) } -func ExampleClient_ListQueues() { - queuePager := adminClient.ListQueues(nil) +func ExampleClient_NewListQueuesPager() { + queuePager := adminClient.NewListQueuesPager(nil) for queuePager.More() { page, err := queuePager.NextPage(context.TODO()) @@ -73,8 +73,8 @@ func ExampleClient_ListQueues() { } } -func ExampleClient_ListQueuesRuntimeProperties() { - queuePager := adminClient.ListQueuesRuntimeProperties(nil) +func ExampleClient_NewListQueuesRuntimePropertiesPager() { + queuePager := adminClient.NewListQueuesRuntimePropertiesPager(nil) for queuePager.More() { page, err := queuePager.NextPage(context.TODO()) diff --git a/sdk/messaging/azservicebus/internal/atom/action.go b/sdk/messaging/azservicebus/internal/atom/action.go deleted file mode 100644 index 9f88e48d2714..000000000000 --- a/sdk/messaging/azservicebus/internal/atom/action.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package atom - -type ( - // SQLAction represents a SQL language-based action expression that is evaluated against a BrokeredMessage. A - // SQLAction supports a subset of the SQL-92 standard. - // - // With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or - // replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL - // UPDATE statement syntax. The action is performed on the message after it has been matched and before the message - // is selected into the subscription. The changes to the message properties are private to the message copied into - // the subscription. - // - // see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter - SQLAction struct { - Expression string - } -) - -// ToActionDescription will transform the SqlAction into a ActionDescription -func (sf SQLAction) ToActionDescription() ActionDescription { - return ActionDescription{ - Type: "SqlRuleAction", - SQLExpression: sf.Expression, - } -} diff --git a/sdk/messaging/azservicebus/internal/atom/converters.go b/sdk/messaging/azservicebus/internal/atom/converters.go index 1d21637badc3..0a64583fa0f2 100644 --- a/sdk/messaging/azservicebus/internal/atom/converters.go +++ b/sdk/messaging/azservicebus/internal/atom/converters.go @@ -60,3 +60,18 @@ func WrapWithSubscriptionEnvelope(sd *SubscriptionDescription) *SubscriptionEnve }, } } + +func WrapWithRuleEnvelope(rd *RuleDescription) *RuleEnvelope { + rd.XMLNS = "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect" + rd.XMLNSI = "http://www.w3.org/2001/XMLSchema-instance" + + return &RuleEnvelope{ + Entry: &Entry{ + AtomSchema: atomSchema, + }, + Content: &RuleContent{ + Type: applicationXML, + RuleDescription: *rd, + }, + } +} diff --git a/sdk/messaging/azservicebus/internal/atom/entity_manager.go b/sdk/messaging/azservicebus/internal/atom/entity_manager.go index 4e9f12df9642..ca2f57d66f8f 100644 --- a/sdk/messaging/azservicebus/internal/atom/entity_manager.go +++ b/sdk/messaging/azservicebus/internal/atom/entity_manager.go @@ -422,12 +422,6 @@ func TraceReqAndResponseMiddleware() MiddlewareFunc { var ErrFeedEmpty = errors.New("entity does not exist") -// ptrString takes a string and returns a pointer to that string. For use in literal pointers, -// ptrString(fmt.Sprintf("..", foo)) -> *string -func ptrString(toPtr string) *string { - return &toPtr -} - // deserializeBody deserializes the body of the response into the type specified by respObj // (similar to xml.Unmarshal, which this func is calling). // If an empty feed is found, it returns nil. diff --git a/sdk/messaging/azservicebus/internal/atom/filter.go b/sdk/messaging/azservicebus/internal/atom/filter.go deleted file mode 100644 index 4d6f96b95bcd..000000000000 --- a/sdk/messaging/azservicebus/internal/atom/filter.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package atom - -type ( - // TrueFilter represents a always true sql expression which will accept all messages - TrueFilter struct{} - - // FalseFilter represents a always false sql expression which will deny all messages - FalseFilter struct{} - - // SQLFilter represents a SQL language-based filter expression that is evaluated against a BrokeredMessage. A - // SQLFilter supports a subset of the SQL-92 standard. - // - // see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter - SQLFilter struct { - Expression string - } - - // CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user - // and system properties. A common use is to match against the CorrelationId property, but the application can also - // choose to match against ContentType, Label, MessageId, ReplyTo, ReplyToSessionId, SessionId, To, and any - // user-defined properties. A match exists when an arriving message's value for a property is equal to the value - // specified in the correlation filter. For string expressions, the comparison is case-sensitive. When specifying - // multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, - // all conditions must match. - CorrelationFilter struct { - CorrelationID *string `xml:"CorrelationId,omitempty"` - MessageID *string `xml:"MessageId,omitempty"` - To *string `xml:"To,omitempty"` - ReplyTo *string `xml:"ReplyTo,omitempty"` - Label *string `xml:"Label,omitempty"` - SessionID *string `xml:"SessionId,omitempty"` - ReplyToSessionID *string `xml:"ReplyToSessionId,omitempty"` - ContentType *string `xml:"ContentType,omitempty"` - Properties map[string]interface{} `xml:"Properties,omitempty"` - } -) - -// ToFilterDescription will transform the TrueFilter into a FilterDescription -func (tf TrueFilter) ToFilterDescription() FilterDescription { - return FilterDescription{ - Type: "TrueFilter", - SQLExpression: ptrString("1=1"), - } -} - -// ToFilterDescription will transform the FalseFilter into a FilterDescription -func (ff FalseFilter) ToFilterDescription() FilterDescription { - return FilterDescription{ - Type: "FalseFilter", - SQLExpression: ptrString("1!=1"), - } -} - -// ToFilterDescription will transform the SqlFilter into a FilterDescription -func (sf SQLFilter) ToFilterDescription() FilterDescription { - return FilterDescription{ - Type: "SqlFilter", - SQLExpression: &sf.Expression, - } -} - -// ToFilterDescription will transform the CorrelationFilter into a FilterDescription -func (cf CorrelationFilter) ToFilterDescription() FilterDescription { - return FilterDescription{ - Type: "CorrelationFilter", - CorrelationFilter: cf, - } -} diff --git a/sdk/messaging/azservicebus/internal/atom/mgmt_test.go b/sdk/messaging/azservicebus/internal/atom/mgmt_test.go index 177c555d3a60..6350af4c8deb 100644 --- a/sdk/messaging/azservicebus/internal/atom/mgmt_test.go +++ b/sdk/messaging/azservicebus/internal/atom/mgmt_test.go @@ -5,125 +5,40 @@ package atom import ( "encoding/xml" + "fmt" + "os" "testing" + "time" "github.com/stretchr/testify/require" ) -const ( - queueDescription1 = ` - - PT1M - 1024 - false - false - P14D - false - PT10M - 10 - true - 0 - 0 - false - Active - 2018-05-04T16:38:27.913Z - 2018-05-04T16:38:41.897Z - true - P14D - false - Available - false - ` +func TestQueueUnmarshal(t *testing.T) { + bytes, err := os.ReadFile("testdata/queue.xml") + require.NoError(t, err) - queueDescription2 = ` - - PT2M - 2048 - false - false - P14D - true - PT20M - 100 - true - 256 - 23 - false - Active - 2018-05-04T16:38:27.913Z - 2018-05-04T16:38:41.897Z - true - P14D - true - Available - false - ` + var env *QueueEnvelope + err = xml.Unmarshal(bytes, &env) + require.NoError(t, err) - queueEntry1 = ` - - https://sbdjtest.servicebus.windows.net/foo - foo - 2018-05-02T20:54:59Z - 2018-05-02T20:54:59Z - - sbdjtest - - - ` + queueDescription1 + - ` - ` + require.Equal(t, "test-queue-name", env.Title) + desc := env.Content.QueueDescription - queueEntry2 = ` - - https://sbdjtest.servicebus.windows.net/bar - bar - 2018-05-02T20:54:59Z - 2018-05-02T20:54:59Z - - sbdjtest - - - ` + queueDescription2 + - ` - ` + // authorization rules is a bit special in that it's a collection with an xmlns that has to be written out + // with its type name. + require.NotEmpty(t, desc.AuthorizationRules) - feedOfQueues = ` - - Queues - https://sbdjtest.servicebus.windows.net/$Resources/Queues - 2018-05-03T00:21:15Z - ` + queueEntry1 + queueEntry2 + - `` -) + authRule := desc.AuthorizationRules[0] + require.Equal(t, "SharedAccessAuthorizationRule", authRule.Type) + require.Equal(t, "redacted-primary-key", *authRule.PrimaryKey) + require.Equal(t, "redacted-secondary-key", *authRule.SecondaryKey) + require.Equal(t, "TestSharedKeyName", *authRule.KeyName) + require.Equal(t, "2022-01-12T01:28:26.1670445Z", authRule.CreatedTime.Format(time.RFC3339Nano)) + require.Equal(t, "2022-02-12T01:28:26.1670445Z", authRule.ModifiedTime.Format(time.RFC3339Nano)) + require.Equal(t, []string{"Manage", "Listen", "Send"}, authRule.Rights) -func TestFeedUnmarshal(t *testing.T) { - var feed Feed - err := xml.Unmarshal([]byte(feedOfQueues), &feed) - require.Nil(t, err) - require.Nil(t, err) - require.Equal(t, "https://sbdjtest.servicebus.windows.net/$Resources/Queues", feed.ID) - require.Equal(t, "Queues", feed.Title) - require.EqualValues(t, 2, len(feed.Entries)) - require.NotNil(t, feed.Entries[0].Content) -} + indentedXML, err := xml.MarshalIndent(env, " ", " ") + require.NoError(t, err) -func TestEntryUnmarshal(t *testing.T) { - var entry Entry - err := xml.Unmarshal([]byte(queueEntry1), &entry) - require.Nil(t, err) - require.Equal(t, "https://sbdjtest.servicebus.windows.net/foo", entry.ID) - require.Equal(t, "foo", entry.Title) - require.Equal(t, "sbdjtest", *entry.Author.Name) - require.Equal(t, "https://sbdjtest.servicebus.windows.net/foo", entry.Link.HREF) - for _, item := range []string{ - `PT1M", - "0", - } { - require.Contains(t, entry.Content.Body, item) - } + fmt.Printf("%s\n", indentedXML) } diff --git a/sdk/messaging/azservicebus/internal/atom/models.go b/sdk/messaging/azservicebus/internal/atom/models.go index 6b6c0864976a..a38f1aeb790e 100644 --- a/sdk/messaging/azservicebus/internal/atom/models.go +++ b/sdk/messaging/azservicebus/internal/atom/models.go @@ -8,6 +8,27 @@ import ( "time" ) +// All +type ( + AuthorizationRule struct { + // Type is the type attribute, which indicates the type of AuthorizationRule + // (today this is only `SharedAccessAuthorizationRule`) + Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` + + ClaimType string `xml:"ClaimType"` + ClaimValue string `xml:"ClaimValue"` + + // SharedAccessAuthorizationRule properties + Rights []string `xml:"Rights>AccessRights"` + KeyName *string `xml:"KeyName"` + CreatedTime *time.Time `xml:"CreatedTime"` + ModifiedTime *time.Time `xml:"ModifiedTime"` + + PrimaryKey *string `xml:"PrimaryKey"` + SecondaryKey *string `xml:"SecondaryKey"` + } +) + // Queues type ( // QueueEntity is the Azure Service Bus description of a Queue for management activities @@ -39,30 +60,31 @@ type ( QueueDescription struct { XMLName xml.Name `xml:"QueueDescription"` BaseEntityDescription - LockDuration *string `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. - MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. - RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. - RequiresSession *bool `xml:"RequiresSession,omitempty"` - DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. - DeadLetteringOnMessageExpiration *bool `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires. - DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. - MaxDeliveryCount *int32 `xml:"MaxDeliveryCount,omitempty"` // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10. - EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. - SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. - MessageCount *int64 `xml:"MessageCount,omitempty"` // MessageCount - The number of messages in the queue. - IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` - Status *EntityStatus `xml:"Status,omitempty"` - AccessedAt string `xml:"AccessedAt,omitempty"` - CreatedAt string `xml:"CreatedAt,omitempty"` - UpdatedAt string `xml:"UpdatedAt,omitempty"` - SupportOrdering *bool `xml:"SupportOrdering,omitempty"` - AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` - EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` - EnableExpress *bool `xml:"EnableExpress,omitempty"` - CountDetails *CountDetails `xml:"CountDetails,omitempty"` - ForwardTo *string `xml:"ForwardTo,omitempty"` - ForwardDeadLetteredMessagesTo *string `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages - UserMetadata *string `xml:"UserMetadata,omitempty"` + LockDuration *string `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. + MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. + RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. + RequiresSession *bool `xml:"RequiresSession,omitempty"` + DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. + DeadLetteringOnMessageExpiration *bool `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires. + DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. + MaxDeliveryCount *int32 `xml:"MaxDeliveryCount,omitempty"` // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10. + EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. + SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. + MessageCount *int64 `xml:"MessageCount,omitempty"` // MessageCount - The number of messages in the queue. + IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` + AuthorizationRules []AuthorizationRule `xml:"AuthorizationRules>AuthorizationRule,omitempty"` + Status *EntityStatus `xml:"Status,omitempty"` + AccessedAt string `xml:"AccessedAt,omitempty"` + CreatedAt string `xml:"CreatedAt,omitempty"` + UpdatedAt string `xml:"UpdatedAt,omitempty"` + SupportOrdering *bool `xml:"SupportOrdering,omitempty"` + AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` + EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` + EnableExpress *bool `xml:"EnableExpress,omitempty"` + CountDetails *CountDetails `xml:"CountDetails,omitempty"` + ForwardTo *string `xml:"ForwardTo,omitempty"` + ForwardDeadLetteredMessagesTo *string `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages + UserMetadata *string `xml:"UserMetadata,omitempty"` } ) @@ -102,26 +124,27 @@ type ( TopicDescription struct { XMLName xml.Name `xml:"TopicDescription"` BaseEntityDescription - DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. - MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. - RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. - DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. - EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. - SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. - FilteringMessagesBeforePublishing *bool `xml:"FilteringMessagesBeforePublishing,omitempty"` - IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` - Status *EntityStatus `xml:"Status,omitempty"` - UserMetadata *string `xml:"UserMetadata,omitempty"` - AccessedAt string `xml:"AccessedAt,omitempty"` - CreatedAt string `xml:"CreatedAt,omitempty"` - UpdatedAt string `xml:"UpdatedAt,omitempty"` - SupportOrdering *bool `xml:"SupportOrdering,omitempty"` - AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` - EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` - EnableSubscriptionPartitioning *bool `xml:"EnableSubscriptionPartitioning,omitempty"` - EnableExpress *bool `xml:"EnableExpress,omitempty"` - CountDetails *CountDetails `xml:"CountDetails,omitempty"` - SubscriptionCount *int32 `xml:"SubscriptionCount,omitempty"` + DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. + MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. + RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. + DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. + EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. + SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. + FilteringMessagesBeforePublishing *bool `xml:"FilteringMessagesBeforePublishing,omitempty"` + IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` + AuthorizationRules []AuthorizationRule `xml:"AuthorizationRules>AuthorizationRule,omitempty"` + Status *EntityStatus `xml:"Status,omitempty"` + UserMetadata *string `xml:"UserMetadata,omitempty"` + AccessedAt string `xml:"AccessedAt,omitempty"` + CreatedAt string `xml:"CreatedAt,omitempty"` + UpdatedAt string `xml:"UpdatedAt,omitempty"` + SupportOrdering *bool `xml:"SupportOrdering,omitempty"` + AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` + EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` + EnableSubscriptionPartitioning *bool `xml:"EnableSubscriptionPartitioning,omitempty"` + EnableExpress *bool `xml:"EnableExpress,omitempty"` + CountDetails *CountDetails `xml:"CountDetails,omitempty"` + SubscriptionCount *int32 `xml:"SubscriptionCount,omitempty"` } ) @@ -131,23 +154,16 @@ func (tf TopicFeed) Items() []TopicEnvelope { // Subscriptions (and rules) type ( - // FilterDescriber can transform itself into a FilterDescription - FilterDescriber interface { - ToFilterDescription() FilterDescription - } - - // ActionDescriber can transform itself into a ActionDescription - ActionDescriber interface { - ToActionDescription() ActionDescription - } - // RuleDescription is the content type for Subscription Rule management requests RuleDescription struct { XMLName xml.Name `xml:"RuleDescription"` + XMLNS string `xml:"xmlns,attr"` + XMLNSI string `xml:"xmlns:i,attr"` BaseEntityDescription CreatedAt string `xml:"CreatedAt,omitempty"` - Filter FilterDescription `xml:"Filter"` + Filter *FilterDescription `xml:"Filter,omitempty"` Action *ActionDescription `xml:"Action,omitempty"` + Name string `xml:"Name"` } // DefaultRuleDescription is the content type for Subscription Rule management requests DefaultRuleDescription struct { @@ -168,10 +184,19 @@ type ( // into the subscription. The default rule has no associated annotation action. FilterDescription struct { XMLName xml.Name `xml:"Filter"` + + // RawXML is any XML that wasn't covered by our known properties. + RawXML []byte `xml:",innerxml"` + + // RawAttrs are attributes for the raw XML element that wasn't covered by our known properties. + RawAttrs []xml.Attr `xml:",any,attr"` + CorrelationFilter Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` SQLExpression *string `xml:"SqlExpression,omitempty"` CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"` + + Parameters *KeyValueList `xml:"Parameters,omitempty"` } // ActionDescription describes an action upon a message that matches a filter @@ -182,28 +207,54 @@ type ( // is selected into the subscription. The changes to the message properties are private to the message copied into // the subscription. ActionDescription struct { - Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` - SQLExpression string `xml:"SqlExpression"` - RequiresPreprocessing bool `xml:"RequiresPreprocessing"` - CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"` + Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` + SQLExpression string `xml:"SqlExpression,omitempty"` + Parameters *KeyValueList `xml:"Parameters,omitempty"` + + // RawXML is any XML that wasn't covered by our known properties. + RawXML []byte `xml:",innerxml"` + + // RawAttrs are attributes for the raw XML element that wasn't covered by our known properties. + RawAttrs []xml.Attr `xml:",any,attr"` + } + + Value struct { + Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` + L28NS string `xml:"xmlns:l28,attr"` + Text string `xml:",chardata"` } - // RuleEntity is the Azure Service Bus description of a Subscription Rule for management activities + KeyValueOfstringanyType struct { + Key string `xml:"Key"` + Value Value `xml:"Value"` + } + + // RuleEntity is the Azure Service Bus description of a Subscription Rule for madnagement activities RuleEntity struct { *RuleDescription *Entity } - // ruleContent is a specialized Subscription body for an Atom entry - ruleContent struct { + // RuleContent is a specialized Subscription body for an Atom entry + RuleContent struct { XMLName xml.Name `xml:"content"` Type string `xml:"type,attr"` RuleDescription RuleDescription `xml:"RuleDescription"` } + TempRuleEnvelope struct { + *Entry + } + + // RuleFeed is a specialized feed containing RuleEnvelopes + RuleFeed struct { + *Feed + Entries []RuleEnvelope `xml:"entry"` + } + RuleEnvelope struct { *Entry - Content *ruleContent `xml:"content"` + Content *RuleContent `xml:"content"` } // SubscriptionDescription is the content type for Subscription management requests @@ -266,6 +317,36 @@ func (sf SubscriptionFeed) Items() []SubscriptionEnvelope { return sf.Entries } +func (rf RuleFeed) Items() []RuleEnvelope { + return rf.Entries +} + +// Filters +type ( + // CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user + // and system properties. A common use is to match against the CorrelationId property, but the application can also + // choose to match against ContentType, Label, MessageId, ReplyTo, ReplyToSessionId, SessionId, To, and any + // user-defined properties. A match exists when an arriving message's value for a property is equal to the value + // specified in the correlation filter. For string expressions, the comparison is case-sensitive. When specifying + // multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, + // all conditions must match. + CorrelationFilter struct { + CorrelationID *string `xml:"CorrelationId,omitempty"` + MessageID *string `xml:"MessageId,omitempty"` + To *string `xml:"To,omitempty"` + ReplyTo *string `xml:"ReplyTo,omitempty"` + Label *string `xml:"Label,omitempty"` + SessionID *string `xml:"SessionId,omitempty"` + ReplyToSessionID *string `xml:"ReplyToSessionId,omitempty"` + ContentType *string `xml:"ContentType,omitempty"` + Properties *KeyValueList `xml:"Properties,omitempty"` + } + + KeyValueList struct { + KeyValues []KeyValueOfstringanyType `xml:"KeyValueOfstringanyType,omitempty"` + } +) + type ( /* diff --git a/sdk/messaging/azservicebus/internal/atom/testdata/queue.xml b/sdk/messaging/azservicebus/internal/atom/testdata/queue.xml new file mode 100644 index 000000000000..cc727872c551 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/atom/testdata/queue.xml @@ -0,0 +1,58 @@ + + https://someservicebus.servicebus.windows.net/amqptesting?api-version=2017-04 + test-queue-name + 2022-02-03T19:34:05Z + 2022-04-12T01:28:26Z + + someservicebus + + + + + PT30S + 1024 + false + false + P14D + false + PT10M + 11 + true + 542 + 3 + false + + + SharedAccessKey + None + + Manage + Listen + Send + + 2022-01-12T01:28:26.1670445Z + 2022-02-12T01:28:26.1670445Z + TestSharedKeyName + redacted-primary-key + redacted-secondary-key + + + Active + 2022-02-03T19:34:05.4983039Z + 2022-04-12T01:28:26.7163324Z + 2022-04-06T20:37:28.447Z + true + + 3 + 0 + 0 + 0 + 0 + + P10675199DT2H48M5.4775807S + false + Available + false + + + \ No newline at end of file diff --git a/sdk/messaging/azservicebus/internal/stress/tools/delete_using_regexp.go b/sdk/messaging/azservicebus/internal/stress/tools/delete_using_regexp.go index 6aa8fff2b861..51118ca5b35a 100644 --- a/sdk/messaging/azservicebus/internal/stress/tools/delete_using_regexp.go +++ b/sdk/messaging/azservicebus/internal/stress/tools/delete_using_regexp.go @@ -50,7 +50,7 @@ func DeleteUsingRegexp(remainingArgs []string) int { switch entityType { case "queue": - pager := adminClient.ListQueues(nil) + pager := adminClient.NewListQueuesPager(nil) var queuesToDelete []string @@ -80,7 +80,7 @@ func DeleteUsingRegexp(remainingArgs []string) int { } } case "topic": - pager := adminClient.ListTopics(nil) + pager := adminClient.NewListTopicsPager(nil) var topicsToDelete []string diff --git a/sdk/messaging/azservicebus/migrationguide.md b/sdk/messaging/azservicebus/migrationguide.md index 2ff38e15264e..89887652c1f3 100644 --- a/sdk/messaging/azservicebus/migrationguide.md +++ b/sdk/messaging/azservicebus/migrationguide.md @@ -206,7 +206,7 @@ Administration features, like creating queues, topics and subscriptions, has bee adminClient, err := admin.NewClientFromConnectionString(connectionString, nil) // create a queue with default properties -resp, err := adminClient.CreateQueue(context.TODO(), "queue-name", nil, nil) +resp, err := adminClient.CreateQueue(context.TODO(), "queue-name", nil) // or create a queue and configure some properties ```