Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Allow for passing in skip and top parameters to calls that list subsc…
Browse files Browse the repository at this point in the history
…riptions, queues and topics #234

The current implementation that lists resources (using ATOM) doesn't allow passing in skip, which would allow you to list more than the default page size of 100 items.

This PR adds in skip (and top, which controls the size of the page) for listing queues, topics and subscriptions which allows you, over multiple calls and incrementing skip, to get the list of all entities.

Fixes #231
  • Loading branch information
richardpark-msft committed Jul 13, 2021
2 parents 80fa1bc + 82d7a09 commit d256ec3
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 10 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ test-race: ARGS=-race ## Run tests with race detector
test-cover: ARGS=-cover -coverprofile=cover.out -v ## Run tests in verbose mode with coverage
$(TEST_TARGETS): NAME=$(MAKECMDGOALS:test-%=%)
$(TEST_TARGETS): test
check test tests: cyclo lint vet terraform.tfstate; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests
$Q cd $(BASE) && $(GO) test -timeout $(TIMEOUT)s $(ARGS) $(TESTPKGS) 2>&1 | $(GOJUNITRPT) > report.xml
check test tests: lint vet terraform.tfstate; $(info $(M) running $(NAME:%=% )tests…) @ ## Run tests
$Q cd $(BASE) && \
$(GO) test -timeout $(TIMEOUT)s $(ARGS) $(TESTPKGS) 2>&1 | tee gotestoutput.log && \
$(GOJUNITRPT) < gotestoutput.log > report.xml && \
rm -f gotestoutput.log

.PHONY: vet
vet: $(GOLINT) ; $(info $(M) running vet…) @ ## Run vet
Expand Down
8 changes: 6 additions & 2 deletions azuredeploy.tf
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,14 @@ output "AZURE_TENANT_ID" {
}

output "AZURE_CLIENT_ID" {
value = compact(concat(azuread_application.test.*.application_id, list(data.azurerm_client_config.current.client_id)))[0]
value = compact(
concat(azuread_application.test.*.application_id, [data.azurerm_client_config.current.client_id])
)[0]
}

output "AZURE_CLIENT_SECRET" {
value = compact(concat(azuread_service_principal_password.test.*.value, list(var.azure_client_secret)))[0]
value = compact(
concat(azuread_service_principal_password.test.*.value, [var.azure_client_secret])
)[0]
sensitive = true
}
26 changes: 26 additions & 0 deletions internal/manager_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package internal

import (
"fmt"
"net/url"
)

// ConstructAtomPath adds the proper parameters for skip and top
// This is common for the list operations for queues, topics and subscriptions.
func ConstructAtomPath(basePath string, skip int, top int) string {
values := url.Values{}

if skip > 0 {
values.Add("$skip", fmt.Sprintf("%d", skip))
}

if top > 0 {
values.Add("$top", fmt.Sprintf("%d", top))
}

if len(values) == 0 {
return basePath
}

return fmt.Sprintf("%s?%s", basePath, values.Encode())
}
20 changes: 20 additions & 0 deletions internal/manager_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package internal

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConstructAtomPath(t *testing.T) {
basePath := ConstructAtomPath("/something", 1, 2)

// I'm assuming the ordering is non-deterministic since the underlying values are just a map
assert.Truef(t, basePath == "/something?%24skip=1&%24top=2" || basePath == "/something?%24top=2&%24skip=1", "%s wasn't one of our two variations", basePath)

basePath = ConstructAtomPath("/something", 0, -1)
assert.EqualValues(t, "/something", basePath, "Values <= 0 are ignored")

basePath = ConstructAtomPath("/something", -1, 0)
assert.EqualValues(t, "/something", basePath, "Values <= 0 are ignored")
}
43 changes: 41 additions & 2 deletions queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/devigned/tab"

"github.com/Azure/azure-service-bus-go/atom"
"github.com/Azure/azure-service-bus-go/internal"
)

type (
Expand Down Expand Up @@ -54,6 +55,34 @@ type (
}
)

type (
// ListQueuesOptions provides options for List() to control things like page size.
// NOTE: Use the ListQueuesWith* methods to specify this.
ListQueuesOptions struct {
top int
skip int
}

// ListQueuesOption represents named options for listing topics
ListQueuesOption func(*ListQueuesOptions) error
)

// ListQueuesWithSkip will skip the specified number of entities
func ListQueuesWithSkip(skip int) ListQueuesOption {
return func(options *ListQueuesOptions) error {
options.skip = skip
return nil
}
}

// ListQueuesWithTop will return at most `top` results
func ListQueuesWithTop(top int) ListQueuesOption {
return func(options *ListQueuesOptions) error {
options.top = top
return nil
}
}

// TargetURI provides an absolute address to a target entity
func (e Entity) TargetURI() string {
split := strings.Split(e.ID, "?")
Expand Down Expand Up @@ -300,11 +329,21 @@ func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManag
}

// List fetches all of the queues for a Service Bus Namespace
func (qm *QueueManager) List(ctx context.Context) ([]*QueueEntity, error) {
func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error) {
ctx, span := qm.startSpanFromContext(ctx, "sb.QueueManager.List")
defer span.End()

res, err := qm.entityManager.Get(ctx, `/$Resources/Queues`)
listQueuesOptions := ListQueuesOptions{}

for _, option := range options {
if err := option(&listQueuesOptions); err != nil {
return nil, err
}
}

basePath := internal.ConstructAtomPath(`/$Resources/Queues`, listQueuesOptions.skip, listQueuesOptions.top)

res, err := qm.entityManager.Get(ctx, basePath)
defer closeRes(ctx, res)

if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,22 @@ func testListQueues(ctx context.Context, t *testing.T, qm *QueueManager, names [
for _, name := range names {
assert.Contains(t, queueNames, name)
}

// there should be at least two entities but there could be others if the service isn't clean (which is fine)
firstSet, err := qm.List(ctx, ListQueuesWithSkip(0), ListQueuesWithTop(1))
assert.NoError(t, err)
assert.EqualValues(t, 1, len(firstSet))

secondSet, err := qm.List(ctx, ListQueuesWithSkip(1), ListQueuesWithTop(1))
assert.NoError(t, err)
assert.EqualValues(t, 1, len(secondSet))

// sanity check - we didn't just retrieve the same entity twice.
assert.NotEqualValues(t, firstSet[0].Name, secondSet[0].Name)

lastSet, err := qm.List(ctx, ListQueuesWithSkip(0), ListQueuesWithTop(2))
assert.NoError(t, err)
assert.EqualValues(t, 2, len(lastSet))
}

func (suite *serviceBusSuite) randEntityName() string {
Expand Down
43 changes: 41 additions & 2 deletions subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/devigned/tab"

"github.com/Azure/azure-service-bus-go/atom"
"github.com/Azure/azure-service-bus-go/internal"
)

type (
Expand Down Expand Up @@ -156,6 +157,34 @@ type (
SubscriptionManagementOption func(*SubscriptionDescription) error
)

type (
// ListSubscriptionsOptions provides options for List() to control things like page size.
// NOTE: Use the ListSubscriptionsWith* methods to specify this.
ListSubscriptionsOptions struct {
top int
skip int
}

//ListSubscriptionsOption represents named options for listing topics
ListSubscriptionsOption func(*ListSubscriptionsOptions) error
)

// ListSubscriptionsWithSkip will skip the specified number of entities
func ListSubscriptionsWithSkip(skip int) ListSubscriptionsOption {
return func(options *ListSubscriptionsOptions) error {
options.skip = skip
return nil
}
}

// ListSubscriptionsWithTop will return at most `top` results
func ListSubscriptionsWithTop(top int) ListSubscriptionsOption {
return func(options *ListSubscriptionsOptions) error {
options.top = top
return nil
}
}

// NewSubscriptionManager creates a new SubscriptionManager for a Service Bus Topic
func (t *Topic) NewSubscriptionManager() *SubscriptionManager {
return &SubscriptionManager{
Expand Down Expand Up @@ -249,11 +278,21 @@ func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...Sub
}

// List fetches all of the Topics for a Service Bus Namespace
func (sm *SubscriptionManager) List(ctx context.Context) ([]*SubscriptionEntity, error) {
func (sm *SubscriptionManager) List(ctx context.Context, options ...ListSubscriptionsOption) ([]*SubscriptionEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.List")
defer span.End()

res, err := sm.entityManager.Get(ctx, "/"+sm.Topic.Name+"/subscriptions")
listSubscriptionsOptions := ListSubscriptionsOptions{}

for _, option := range options {
if err := option(&listSubscriptionsOptions); err != nil {
return nil, err
}
}

basePath := internal.ConstructAtomPath("/"+sm.Topic.Name+"/subscriptions", listSubscriptionsOptions.skip, listSubscriptionsOptions.top)

res, err := sm.entityManager.Get(ctx, basePath)
defer closeRes(ctx, res)

if err != nil {
Expand Down
44 changes: 44 additions & 0 deletions subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,50 @@ func (suite *serviceBusSuite) TestSubscriptionManagement() {
suite.testSubscriptionManager(tests)
}

func (suite *serviceBusSuite) TestSubscriptionManagementReads() {
tests := map[string]func(ctx context.Context, t *testing.T, sm *SubscriptionManager, topicName, name string){
"TestListSubscriptions": testListSubscriptions,
}

suite.testSubscriptionManager(tests)
}

func testListSubscriptions(ctx context.Context, t *testing.T, sm *SubscriptionManager, _, name string) {
names := []string{name + "-1", name + "-2"}

for _, name := range names {
buildSubscription(ctx, t, sm, name)
}

subs, err := sm.List(ctx)
assert.Nil(t, err)
assert.NotNil(t, subs)
subNames := make([]string, len(subs))
for idx, s := range subs {
subNames[idx] = s.Name
}

for _, name := range names {
assert.Contains(t, subNames, name)
}

// there should be at least two entities but there could be others if the service isn't clean (which is fine)
firstSet, err := sm.List(ctx, ListSubscriptionsWithSkip(0), ListSubscriptionsWithTop(1))
assert.NoError(t, err)
assert.EqualValues(t, 1, len(firstSet))

secondSet, err := sm.List(ctx, ListSubscriptionsWithSkip(1), ListSubscriptionsWithTop(1))
assert.NoError(t, err)
assert.EqualValues(t, 1, len(secondSet))

// sanity check - we didn't just retrieve the same entity twice.
assert.NotEqualValues(t, firstSet[0].Name, secondSet[0].Name)

lastSet, err := sm.List(ctx, ListSubscriptionsWithSkip(0), ListSubscriptionsWithTop(2))
assert.NoError(t, err)
assert.EqualValues(t, 2, len(lastSet))
}

func testDefaultSubscription(ctx context.Context, t *testing.T, sm *SubscriptionManager, _, name string) {
s := buildSubscription(ctx, t, sm, name)
assert.False(t, *s.DeadLetteringOnMessageExpiration, "should not have dead lettering on expiration")
Expand Down
43 changes: 41 additions & 2 deletions topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/devigned/tab"

"github.com/Azure/azure-service-bus-go/atom"
"github.com/Azure/azure-service-bus-go/internal"
)

type (
Expand Down Expand Up @@ -49,6 +50,34 @@ type (
TopicManagementOption func(*TopicDescription) error
)

type (
// ListTopicsOptions provides options for List() to control things like page size.
// NOTE: Use the ListTopicsWith* methods to specify this.
ListTopicsOptions struct {
top int
skip int
}

// ListTopicsOption represents named options for listing topics
ListTopicsOption func(*ListTopicsOptions) error
)

// ListTopicsWithSkip will skip the specified number of entities
func ListTopicsWithSkip(skip int) ListTopicsOption {
return func(options *ListTopicsOptions) error {
options.skip = skip
return nil
}
}

// ListTopicsWithTop will return at most `top` results
func ListTopicsWithTop(top int) ListTopicsOption {
return func(options *ListTopicsOptions) error {
options.top = top
return nil
}
}

// NewTopicManager creates a new TopicManager for a Service Bus Namespace
func (ns *Namespace) NewTopicManager() *TopicManager {
return &TopicManager{
Expand Down Expand Up @@ -122,11 +151,21 @@ func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManag
}

// List fetches all of the Topics for a Service Bus Namespace
func (tm *TopicManager) List(ctx context.Context) ([]*TopicEntity, error) {
func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error) {
ctx, span := tm.startSpanFromContext(ctx, "sb.TopicManager.List")
defer span.End()

res, err := tm.entityManager.Get(ctx, `/$Resources/Topics`)
listTopicsOptions := ListTopicsOptions{}

for _, option := range options {
if err := option(&listTopicsOptions); err != nil {
return nil, err
}
}

basePath := internal.ConstructAtomPath("/$Resources/Topics", listTopicsOptions.skip, listTopicsOptions.top)

res, err := tm.entityManager.Get(ctx, basePath)
defer closeRes(ctx, res)

if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ func testListTopics(ctx context.Context, t *testing.T, tm *TopicManager, names [
for _, name := range names {
assert.Contains(t, queueNames, name)
}

// there should be at least two entities but there could be others if the service isn't clean (which is fine)
firstSet, err := tm.List(ctx, ListTopicsWithSkip(0), ListTopicsWithTop(1))
assert.NoError(t, err)
assert.EqualValues(t, 1, len(firstSet))

secondSet, err := tm.List(ctx, ListTopicsWithSkip(1), ListTopicsWithTop(1))
assert.NoError(t, err)
assert.EqualValues(t, 1, len(secondSet))

// sanity check - we didn't just retrieve the same entity twice.
assert.NotEqualValues(t, firstSet[0].Name, secondSet[0].Name)

lastSet, err := tm.List(ctx, ListTopicsWithSkip(0), ListTopicsWithTop(2))
assert.NoError(t, err)
assert.EqualValues(t, 2, len(lastSet))
}

func (suite *serviceBusSuite) TestTopicManagement() {
Expand Down

0 comments on commit d256ec3

Please sign in to comment.