From 10d9f33be3a562ace7c0eb472edab7df1b5ed80c Mon Sep 17 00:00:00 2001 From: Jacob White Date: Sat, 19 Nov 2022 14:21:26 -0500 Subject: [PATCH 1/3] Add support for queue --- .changelog/1131.txt | 3 + queue.go | 296 +++++++++++++++++++++++++++ queue_test.go | 484 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 783 insertions(+) create mode 100644 .changelog/1131.txt create mode 100644 queue.go create mode 100644 queue_test.go diff --git a/.changelog/1131.txt b/.changelog/1131.txt new file mode 100644 index 00000000000..1434c51d66c --- /dev/null +++ b/.changelog/1131.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +queue: add support queue API +``` \ No newline at end of file diff --git a/queue.go b/queue.go new file mode 100644 index 00000000000..757c14374ff --- /dev/null +++ b/queue.go @@ -0,0 +1,296 @@ +package cloudflare + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" +) + +var ( + ErrMissingQueueName = errors.New("required queue name is missing") + ErrMissingQueueConsumerName = errors.New("required queue consumer name is missing") +) + +type Queue struct { + ID string `json:"queue_id,omitempty"` + Name string `json:"queue_name,omitempty"` + CreatedOn *time.Time `json:"created_on,omitempty"` + ModifiedOn *time.Time `json:"modified_on,omitempty"` + ProducersTotalCount int `json:"producers_total_count,omitempty"` + Producers []QueueProducer `json:"producers,omitempty"` + ConsumersTotalCount int `json:"consumers_total_count,omitempty"` + Consumers []QueueConsumer `json:"consumers,omitempty"` +} + +type QueueProducer struct { + Service string `json:"service,omitempty"` + Environment string `json:"environment,omitempty"` +} + +type QueueConsumer struct { + Service string `json:"service,omitempty"` + ScriptName string `json:"script_name,omitempty"` + Environment string `json:"environment,omitempty"` + Settings QueueConsumerSettings `json:"settings,omitempty"` + QueueName string `json:"queue_name,omitempty"` + CreatedOn *time.Time `json:"created_on,omitempty"` + DeadLetterQueue string `json:"dead_letter_queue,omitempty"` +} + +type QueueConsumerSettings struct { + BatchSize int `json:"batch_size,omitempty"` + MaxRetires int `json:"max_retries,omitempty"` + MaxWaitTime int `json:"max_wait_time_ms,omitempty"` +} + +type QueueListResponse struct { + Response + ResultInfo `json:"result_info"` + Result []Queue `json:"result"` +} + +type QueueCreateParams struct { + Name string `json:"queue_name"` +} + +type QueueResponse struct { + Response + Result Queue `json:"result"` +} + +type QueueListConsumersResponse struct { + Response + ResultInfo `json:"result_info"` + Result []QueueConsumer `json:"result"` +} + +type QueueConsumerResponse struct { + Response + Result QueueConsumer `json:"result"` +} + +// QueueList returns the queues owned by an account. +// +// API reference: https://api.cloudflare.com/#queue-list-queues +func (api *API) QueueList(ctx context.Context, rc *ResourceContainer) ([]Queue, error) { + if rc.Identifier == "" { + return []Queue{}, ErrMissingAccountID + } + uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier) + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return []Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + var r QueueListResponse + err = json.Unmarshal(res, &r) + if err != nil { + return []Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// QueueCreate creates a new queue. +// +// API reference: https://api.cloudflare.com/#queue-create-queue +func (api *API) QueueCreate(ctx context.Context, rc *ResourceContainer, queue QueueCreateParams) (Queue, error) { + if rc.Identifier == "" { + return Queue{}, ErrMissingAccountID + } + + if queue.Name == "" { + return Queue{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier) + res, err := api.makeRequestContext(ctx, http.MethodPost, uri, queue) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueResponse + err = json.Unmarshal(res, &r) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// QueueDelete deletes a queue. +// +// API reference: https://api.cloudflare.com/#queue-delete-queue +func (api *API) QueueDelete(ctx context.Context, rc *ResourceContainer, queueName string) error { + if rc.Identifier == "" { + return ErrMissingAccountID + } + if queueName == "" { + return ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName) + _, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil) + if err != nil { + return fmt.Errorf("%s: %w", errMakeRequestError, err) + } + return nil +} + +// QueueGet returns a queue. +// +// API reference: https://api.cloudflare.com/#queue-get-queue +func (api *API) QueueGet(ctx context.Context, rc *ResourceContainer, queueName string) (Queue, error) { + if rc.Identifier == "" { + return Queue{}, ErrMissingAccountID + } + if queueName == "" { + return Queue{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName) + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueResponse + err = json.Unmarshal(res, &r) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// QueueUpdate updates a queue. +// +// API reference: https://api.cloudflare.com/#queue-update-queue +func (api *API) QueueUpdate(ctx context.Context, rc *ResourceContainer, queueName string, queue Queue) (Queue, error) { + if rc.Identifier == "" { + return Queue{}, ErrMissingAccountID + } + + if queueName == "" { + return Queue{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName) + res, err := api.makeRequestContext(ctx, http.MethodPut, uri, queue) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueResponse + err = json.Unmarshal(res, &r) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// QueueListConsumers returns the consumers of a queue. +// +// API reference: https://api.cloudflare.com/#queue-list-queue-consumers +func (api *API) QueueListConsumers(ctx context.Context, rc *ResourceContainer, queueName string) ([]QueueConsumer, error) { + if rc.Identifier == "" { + return []QueueConsumer{}, ErrMissingAccountID + } + + if queueName == "" { + return []QueueConsumer{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, queueName) + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return []QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueListConsumersResponse + err = json.Unmarshal(res, &r) + if err != nil { + return []QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// QueueCreateConsumer creates a new consumer for a queue. +// +// API reference: https://api.cloudflare.com/#queue-create-queue-consumer +func (api *API) QueueCreateConsumer(ctx context.Context, rc *ResourceContainer, queueName string, consumer QueueConsumer) (QueueConsumer, error) { + if rc.Identifier == "" { + return QueueConsumer{}, ErrMissingAccountID + } + + if queueName == "" { + return QueueConsumer{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, queueName) + res, err := api.makeRequestContext(ctx, http.MethodPost, uri, consumer) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueConsumerResponse + err = json.Unmarshal(res, &r) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// QueueDeleteConsumer deletes the consumer for a queue.. +// +// API reference: https://api.cloudflare.com/#queue-delete-queue-consumer +func (api *API) QueueDeleteConsumer(ctx context.Context, rc *ResourceContainer, queueName, consumerName string) error { + if rc.Identifier == "" { + return ErrMissingAccountID + } + + if queueName == "" { + return ErrMissingQueueName + } + + if consumerName == "" { + return ErrMissingQueueConsumerName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, queueName, consumerName) + _, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil) + if err != nil { + return fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + return nil +} + +// QueueUpdateConsumer updates the consumer for a queue, or creates one if it does not exist.. +// +// API reference: https://api.cloudflare.com/#queue-update-queue-consumer +func (api *API) QueueUpdateConsumer(ctx context.Context, rc *ResourceContainer, queueName, consumerName string, consumer QueueConsumer) (QueueConsumer, error) { + if rc.Identifier == "" { + return QueueConsumer{}, ErrMissingAccountID + } + + if queueName == "" { + return QueueConsumer{}, ErrMissingQueueName + } + + if consumerName == "" { + return QueueConsumer{}, ErrMissingQueueConsumerName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, queueName, consumerName) + res, err := api.makeRequestContext(ctx, http.MethodPut, uri, consumer) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueConsumerResponse + err = json.Unmarshal(res, &r) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 00000000000..c210782d6a9 --- /dev/null +++ b/queue_test.go @@ -0,0 +1,484 @@ +package cloudflare + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + testQueueID = "6b7efc370ea34ded8327fa20698dfe3a" + testQueueName = "example-queue" + testQueueConsumerName = "example-consumer" +) + +func testQueue() Queue { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + return Queue{ + ID: testQueueID, + Name: testQueueName, + CreatedOn: &CreatedOn, + ModifiedOn: &ModifiedOn, + ProducersTotalCount: 1, + Producers: []QueueProducer{ + { + Service: "example-producer", + Environment: "production", + }, + }, + ConsumersTotalCount: 1, + Consumers: []QueueConsumer{ + { + Service: "example-consumer", + Environment: "production", + Settings: QueueConsumerSettings{ + BatchSize: 10, + MaxRetires: 3, + MaxWaitTime: 5000, + }, + }, + }, + } +} + +func testQueueConsumer() QueueConsumer { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + return QueueConsumer{ + Service: "example-consumer", + Environment: "production", + Settings: QueueConsumerSettings{ + BatchSize: 10, + MaxRetires: 3, + MaxWaitTime: 5000, + }, + QueueName: testQueueName, + CreatedOn: &CreatedOn, + } +} + +func TestQueue_List(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues", testAccountID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method, "Expected method 'GET', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": null, + "messages": null, + "result": [ + { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z", + "producers_total_count": 1, + "producers": [ + { + "service": "example-producer", + "environment": "production" + } + ], + "consumers_total_count": 1, + "consumers": [ + { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + } + } + ] + } + ], + "result_info": { + "page": 1, + "per_page": 100, + "count": 1, + "total_count": 1, + "total_pages": 1 + } +}`) + }) + + _, err := client.QueueList(context.Background(), AccountIdentifier("")) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + result, err := client.QueueList(context.Background(), AccountIdentifier(testAccountID)) + if assert.NoError(t, err) { + assert.Equal(t, 1, len(result)) + assert.Equal(t, testQueue(), result[0]) + } +} + +func TestQueue_Create(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues", testAccountID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method, "Expected method 'POST', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": null, + "messages": null, + "result": { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z" + } + }`) + }) + _, err := client.QueueCreate(context.Background(), AccountIdentifier(""), QueueCreateParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.QueueCreate(context.Background(), AccountIdentifier(testAccountID), QueueCreateParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + results, err := client.QueueCreate(context.Background(), AccountIdentifier(testAccountID), QueueCreateParams{Name: "example-queue"}) + if assert.NoError(t, err) { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + createdQueue := Queue{ + ID: testQueueID, + Name: testQueueName, + CreatedOn: &CreatedOn, + ModifiedOn: &ModifiedOn, + } + + assert.Equal(t, createdQueue, results) + } +} + +func TestQueue_Delete(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodDelete, r.Method, "Expected method 'DELETE', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": null + }`) + }) + err := client.QueueDelete(context.Background(), AccountIdentifier(""), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + err = client.QueueDelete(context.Background(), AccountIdentifier(testAccountID), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + err = client.QueueDelete(context.Background(), AccountIdentifier(testAccountID), testQueueName) + assert.NoError(t, err) +} + +func TestQueue_Get(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method, "Expected method 'GET', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, ` + { + "success": true, + "errors": [], + "messages": [], + "result": { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z", + "producers_total_count": 1, + "producers": [ + { + "service": "example-producer", + "environment": "production" + } + ], + "consumers_total_count": 1, + "consumers": [ + { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + } + } + ] + } + }`) + }) + + _, err := client.QueueGet(context.Background(), AccountIdentifier(""), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.QueueGet(context.Background(), AccountIdentifier(testAccountID), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + result, err := client.QueueGet(context.Background(), AccountIdentifier(testAccountID), testQueueID) + if assert.NoError(t, err) { + assert.Equal(t, testQueue(), result) + } +} + +func TestQueue_Update(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPut, r.Method, "Expected method 'PATCH', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": null, + "messages": null, + "result": { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "renamed-example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z" + } + }`) + }) + _, err := client.QueueUpdate(context.Background(), AccountIdentifier(""), "", Queue{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.QueueUpdate(context.Background(), AccountIdentifier(testAccountID), "", Queue{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + results, err := client.QueueUpdate(context.Background(), AccountIdentifier(testAccountID), testQueueID, Queue{Name: "example-queue"}) + if assert.NoError(t, err) { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + createdQueue := Queue{ + ID: testQueueID, + Name: "renamed-example-queue", + CreatedOn: &CreatedOn, + ModifiedOn: &ModifiedOn, + } + + assert.Equal(t, createdQueue, results) + } +} + +func TestQueue_ListConsumers(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", testAccountID, testQueueID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method, "Expected method 'GET', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, ` + { + "success": true, + "errors": null, + "messages": null, + "result": [ + { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + }, + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z" + } + ], + "result_info": { + "page": 1, + "per_page": 100, + "count": 1, + "total_count": 1, + "total_pages": 1 + } + }`) + }) + + _, err := client.QueueListConsumers(context.Background(), AccountIdentifier(""), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.QueueListConsumers(context.Background(), AccountIdentifier(testAccountID), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + result, err := client.QueueListConsumers(context.Background(), AccountIdentifier(testAccountID), testQueueID) + if assert.NoError(t, err) { + assert.Equal(t, 1, len(result)) + assert.Equal(t, testQueueConsumer(), result[0]) + } +} + +func TestQueue_CreateConsumer(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method, "Expected method 'POST', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + }, + "dead_letter_queue": "example-dlq", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z" + } + }`) + }) + + _, err := client.QueueCreateConsumer(context.Background(), AccountIdentifier(""), "", QueueConsumer{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.QueueCreateConsumer(context.Background(), AccountIdentifier(testAccountID), "", QueueConsumer{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + result, err := client.QueueCreateConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, QueueConsumer{ + Service: "example-consumer", + Environment: "production", + }) + if assert.NoError(t, err) { + expectedQueueConsumer := testQueueConsumer() + expectedQueueConsumer.DeadLetterQueue = "example-dlq" + assert.Equal(t, expectedQueueConsumer, result) + } +} + +func TestQueue_DeleteConsumer(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", testAccountID, testQueueName, testQueueConsumerName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodDelete, r.Method, "Expected method 'DELETE', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": null + }`) + }) + + err := client.QueueDeleteConsumer(context.Background(), AccountIdentifier(""), "", "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + err = client.QueueDeleteConsumer(context.Background(), AccountIdentifier(testAccountID), "", "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + err = client.QueueDeleteConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueConsumerName, err) + } + + err = client.QueueDeleteConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, testQueueConsumerName) + assert.NoError(t, err) +} + +func TestQueue_UpdateConsumer(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", testAccountID, testQueueName, testQueueConsumerName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPut, r.Method, "Expected method 'PUT', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + }, + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z" + } + }`) + }) + + _, err := client.QueueUpdateConsumer(context.Background(), AccountIdentifier(""), "", "", QueueConsumer{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.QueueUpdateConsumer(context.Background(), AccountIdentifier(testAccountID), "", "", QueueConsumer{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + _, err = client.QueueUpdateConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, "", QueueConsumer{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueConsumerName, err) + } + + result, err := client.QueueUpdateConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, testQueueConsumerName, QueueConsumer{ + Service: "example-consumer", + Environment: "production", + }) + if assert.NoError(t, err) { + assert.Equal(t, testQueueConsumer(), result) + } +} From 02c191694ce8eabadffb9bbcd3ac2fcd1c391420 Mon Sep 17 00:00:00 2001 From: Jacob Bednarz Date: Mon, 21 Nov 2022 11:24:48 +1100 Subject: [PATCH 2/3] fix method naming conventions and tests --- queue.go | 109 +++++++++++++++++++++++++++++++++----------------- queue_test.go | 67 ++++++++++++++++--------------- 2 files changed, 107 insertions(+), 69 deletions(-) diff --git a/queue.go b/queue.go index 757c14374ff..0f2332dc9a7 100644 --- a/queue.go +++ b/queue.go @@ -31,6 +31,7 @@ type QueueProducer struct { } type QueueConsumer struct { + Name string `json:"-"` Service string `json:"service,omitempty"` ScriptName string `json:"script_name,omitempty"` Environment string `json:"environment,omitempty"` @@ -52,7 +53,7 @@ type QueueListResponse struct { Result []Queue `json:"result"` } -type QueueCreateParams struct { +type CreateQueueParams struct { Name string `json:"queue_name"` } @@ -61,41 +62,75 @@ type QueueResponse struct { Result Queue `json:"result"` } -type QueueListConsumersResponse struct { +type ListQueueConsumersResponse struct { Response ResultInfo `json:"result_info"` Result []QueueConsumer `json:"result"` } +type ListQueuesParams struct{} + type QueueConsumerResponse struct { Response Result QueueConsumer `json:"result"` } -// QueueList returns the queues owned by an account. +type UpdateQueueParams struct { + ID string `json:"queue_id,omitempty"` + Name string `json:"queue_name,omitempty"` + CreatedOn *time.Time `json:"created_on,omitempty"` + ModifiedOn *time.Time `json:"modified_on,omitempty"` + ProducersTotalCount int `json:"producers_total_count,omitempty"` + Producers []QueueProducer `json:"producers,omitempty"` + ConsumersTotalCount int `json:"consumers_total_count,omitempty"` + Consumers []QueueConsumer `json:"consumers,omitempty"` +} + +type ListQueueConsumersParams struct { + QueueName string `url:"-"` +} + +type CreateQueueConsumerParams struct { + QueueName string `json:"-"` + Consumer QueueConsumer +} + +type UpdateQueueConsumerParams struct { + QueueName string `json:"-"` + Consumer QueueConsumer +} + +type DeleteQueueConsumerParams struct { + QueueName, ConsumerName string +} + +// ListQueues returns the queues owned by an account. // // API reference: https://api.cloudflare.com/#queue-list-queues -func (api *API) QueueList(ctx context.Context, rc *ResourceContainer) ([]Queue, error) { +func (api *API) ListQueues(ctx context.Context, rc *ResourceContainer, params ListQueuesParams) ([]Queue, error) { if rc.Identifier == "" { return []Queue{}, ErrMissingAccountID } + uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier) res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) if err != nil { return []Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) } + var r QueueListResponse err = json.Unmarshal(res, &r) if err != nil { return []Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) } + return r.Result, nil } -// QueueCreate creates a new queue. +// CreateQueue creates a new queue. // // API reference: https://api.cloudflare.com/#queue-create-queue -func (api *API) QueueCreate(ctx context.Context, rc *ResourceContainer, queue QueueCreateParams) (Queue, error) { +func (api *API) CreateQueue(ctx context.Context, rc *ResourceContainer, queue CreateQueueParams) (Queue, error) { if rc.Identifier == "" { return Queue{}, ErrMissingAccountID } @@ -118,10 +153,10 @@ func (api *API) QueueCreate(ctx context.Context, rc *ResourceContainer, queue Qu return r.Result, nil } -// QueueDelete deletes a queue. +// DeleteQueue deletes a queue. // // API reference: https://api.cloudflare.com/#queue-delete-queue -func (api *API) QueueDelete(ctx context.Context, rc *ResourceContainer, queueName string) error { +func (api *API) DeleteQueue(ctx context.Context, rc *ResourceContainer, queueName string) error { if rc.Identifier == "" { return ErrMissingAccountID } @@ -137,13 +172,14 @@ func (api *API) QueueDelete(ctx context.Context, rc *ResourceContainer, queueNam return nil } -// QueueGet returns a queue. +// GetQueue returns a single queue based on the name. // // API reference: https://api.cloudflare.com/#queue-get-queue -func (api *API) QueueGet(ctx context.Context, rc *ResourceContainer, queueName string) (Queue, error) { +func (api *API) GetQueue(ctx context.Context, rc *ResourceContainer, queueName string) (Queue, error) { if rc.Identifier == "" { return Queue{}, ErrMissingAccountID } + if queueName == "" { return Queue{}, ErrMissingQueueName } @@ -159,23 +195,24 @@ func (api *API) QueueGet(ctx context.Context, rc *ResourceContainer, queueName s if err != nil { return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) } + return r.Result, nil } -// QueueUpdate updates a queue. +// UpdateQueue updates a queue. // // API reference: https://api.cloudflare.com/#queue-update-queue -func (api *API) QueueUpdate(ctx context.Context, rc *ResourceContainer, queueName string, queue Queue) (Queue, error) { +func (api *API) UpdateQueue(ctx context.Context, rc *ResourceContainer, params UpdateQueueParams) (Queue, error) { if rc.Identifier == "" { return Queue{}, ErrMissingAccountID } - if queueName == "" { + if params.Name == "" { return Queue{}, ErrMissingQueueName } - uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName) - res, err := api.makeRequestContext(ctx, http.MethodPut, uri, queue) + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, params.Name) + res, err := api.makeRequestContext(ctx, http.MethodPut, uri, nil) if err != nil { return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) } @@ -188,25 +225,25 @@ func (api *API) QueueUpdate(ctx context.Context, rc *ResourceContainer, queueNam return r.Result, nil } -// QueueListConsumers returns the consumers of a queue. +// ListQueueConsumers returns the consumers of a queue. // // API reference: https://api.cloudflare.com/#queue-list-queue-consumers -func (api *API) QueueListConsumers(ctx context.Context, rc *ResourceContainer, queueName string) ([]QueueConsumer, error) { +func (api *API) ListQueueConsumers(ctx context.Context, rc *ResourceContainer, params ListQueueConsumersParams) ([]QueueConsumer, error) { if rc.Identifier == "" { return []QueueConsumer{}, ErrMissingAccountID } - if queueName == "" { + if params.QueueName == "" { return []QueueConsumer{}, ErrMissingQueueName } - uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, queueName) + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName) res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) if err != nil { return []QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) } - var r QueueListConsumersResponse + var r ListQueueConsumersResponse err = json.Unmarshal(res, &r) if err != nil { return []QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err) @@ -214,20 +251,20 @@ func (api *API) QueueListConsumers(ctx context.Context, rc *ResourceContainer, q return r.Result, nil } -// QueueCreateConsumer creates a new consumer for a queue. +// CreateQueueConsumer creates a new consumer for a queue. // // API reference: https://api.cloudflare.com/#queue-create-queue-consumer -func (api *API) QueueCreateConsumer(ctx context.Context, rc *ResourceContainer, queueName string, consumer QueueConsumer) (QueueConsumer, error) { +func (api *API) CreateQueueConsumer(ctx context.Context, rc *ResourceContainer, params CreateQueueConsumerParams) (QueueConsumer, error) { if rc.Identifier == "" { return QueueConsumer{}, ErrMissingAccountID } - if queueName == "" { + if params.QueueName == "" { return QueueConsumer{}, ErrMissingQueueName } - uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, queueName) - res, err := api.makeRequestContext(ctx, http.MethodPost, uri, consumer) + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName) + res, err := api.makeRequestContext(ctx, http.MethodPost, uri, params.Consumer) if err != nil { return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) } @@ -240,23 +277,23 @@ func (api *API) QueueCreateConsumer(ctx context.Context, rc *ResourceContainer, return r.Result, nil } -// QueueDeleteConsumer deletes the consumer for a queue.. +// DeleteQueueConsumer deletes the consumer for a queue.. // // API reference: https://api.cloudflare.com/#queue-delete-queue-consumer -func (api *API) QueueDeleteConsumer(ctx context.Context, rc *ResourceContainer, queueName, consumerName string) error { +func (api *API) DeleteQueueConsumer(ctx context.Context, rc *ResourceContainer, params DeleteQueueConsumerParams) error { if rc.Identifier == "" { return ErrMissingAccountID } - if queueName == "" { + if params.QueueName == "" { return ErrMissingQueueName } - if consumerName == "" { + if params.ConsumerName == "" { return ErrMissingQueueConsumerName } - uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, queueName, consumerName) + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.ConsumerName) _, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil) if err != nil { return fmt.Errorf("%s: %w", errMakeRequestError, err) @@ -265,24 +302,24 @@ func (api *API) QueueDeleteConsumer(ctx context.Context, rc *ResourceContainer, return nil } -// QueueUpdateConsumer updates the consumer for a queue, or creates one if it does not exist.. +// UpdateQueueConsumer updates the consumer for a queue, or creates one if it does not exist.. // // API reference: https://api.cloudflare.com/#queue-update-queue-consumer -func (api *API) QueueUpdateConsumer(ctx context.Context, rc *ResourceContainer, queueName, consumerName string, consumer QueueConsumer) (QueueConsumer, error) { +func (api *API) UpdateQueueConsumer(ctx context.Context, rc *ResourceContainer, params UpdateQueueConsumerParams) (QueueConsumer, error) { if rc.Identifier == "" { return QueueConsumer{}, ErrMissingAccountID } - if queueName == "" { + if params.QueueName == "" { return QueueConsumer{}, ErrMissingQueueName } - if consumerName == "" { + if params.Consumer.Name == "" { return QueueConsumer{}, ErrMissingQueueConsumerName } - uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, queueName, consumerName) - res, err := api.makeRequestContext(ctx, http.MethodPut, uri, consumer) + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.Consumer.Name) + res, err := api.makeRequestContext(ctx, http.MethodPut, uri, params.Consumer) if err != nil { return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) } diff --git a/queue_test.go b/queue_test.go index c210782d6a9..f35c24960b1 100644 --- a/queue_test.go +++ b/queue_test.go @@ -110,12 +110,12 @@ func TestQueue_List(t *testing.T) { }`) }) - _, err := client.QueueList(context.Background(), AccountIdentifier("")) + _, err := client.ListQueues(context.Background(), AccountIdentifier(""), ListQueuesParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - result, err := client.QueueList(context.Background(), AccountIdentifier(testAccountID)) + result, err := client.ListQueues(context.Background(), AccountIdentifier(testAccountID), ListQueuesParams{}) if assert.NoError(t, err) { assert.Equal(t, 1, len(result)) assert.Equal(t, testQueue(), result[0]) @@ -142,16 +142,16 @@ func TestQueue_Create(t *testing.T) { } }`) }) - _, err := client.QueueCreate(context.Background(), AccountIdentifier(""), QueueCreateParams{}) + _, err := client.CreateQueue(context.Background(), AccountIdentifier(""), CreateQueueParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - _, err = client.QueueCreate(context.Background(), AccountIdentifier(testAccountID), QueueCreateParams{}) + _, err = client.CreateQueue(context.Background(), AccountIdentifier(testAccountID), CreateQueueParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - results, err := client.QueueCreate(context.Background(), AccountIdentifier(testAccountID), QueueCreateParams{Name: "example-queue"}) + results, err := client.CreateQueue(context.Background(), AccountIdentifier(testAccountID), CreateQueueParams{Name: "example-queue"}) if assert.NoError(t, err) { CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") @@ -181,17 +181,17 @@ func TestQueue_Delete(t *testing.T) { "result": null }`) }) - err := client.QueueDelete(context.Background(), AccountIdentifier(""), "") + err := client.DeleteQueue(context.Background(), AccountIdentifier(""), "") if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - err = client.QueueDelete(context.Background(), AccountIdentifier(testAccountID), "") + err = client.DeleteQueue(context.Background(), AccountIdentifier(testAccountID), "") if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - err = client.QueueDelete(context.Background(), AccountIdentifier(testAccountID), testQueueName) + err = client.DeleteQueue(context.Background(), AccountIdentifier(testAccountID), testQueueName) assert.NoError(t, err) } @@ -236,17 +236,17 @@ func TestQueue_Get(t *testing.T) { }`) }) - _, err := client.QueueGet(context.Background(), AccountIdentifier(""), "") + _, err := client.GetQueue(context.Background(), AccountIdentifier(""), "") if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - _, err = client.QueueGet(context.Background(), AccountIdentifier(testAccountID), "") + _, err = client.GetQueue(context.Background(), AccountIdentifier(testAccountID), "") if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - result, err := client.QueueGet(context.Background(), AccountIdentifier(testAccountID), testQueueID) + result, err := client.GetQueue(context.Background(), AccountIdentifier(testAccountID), testQueueID) if assert.NoError(t, err) { assert.Equal(t, testQueue(), result) } @@ -256,8 +256,8 @@ func TestQueue_Update(t *testing.T) { setup() defer teardown() - mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueID), func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, http.MethodPut, r.Method, "Expected method 'PATCH', got %s", r.Method) + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPut, r.Method, "Expected method 'PUT', got %s", r.Method) w.Header().Set("content-type", "application/json") fmt.Fprintf(w, `{ @@ -272,17 +272,17 @@ func TestQueue_Update(t *testing.T) { } }`) }) - _, err := client.QueueUpdate(context.Background(), AccountIdentifier(""), "", Queue{}) + _, err := client.UpdateQueue(context.Background(), AccountIdentifier(""), UpdateQueueParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - _, err = client.QueueUpdate(context.Background(), AccountIdentifier(testAccountID), "", Queue{}) + _, err = client.UpdateQueue(context.Background(), AccountIdentifier(testAccountID), UpdateQueueParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - results, err := client.QueueUpdate(context.Background(), AccountIdentifier(testAccountID), testQueueID, Queue{Name: "example-queue"}) + results, err := client.UpdateQueue(context.Background(), AccountIdentifier(testAccountID), UpdateQueueParams{Name: "example-queue"}) if assert.NoError(t, err) { CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") @@ -301,7 +301,7 @@ func TestQueue_ListConsumers(t *testing.T) { setup() defer teardown() - mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", testAccountID, testQueueID), func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, http.MethodGet, r.Method, "Expected method 'GET', got %s", r.Method) w.Header().Set("content-type", "application/json") @@ -333,17 +333,17 @@ func TestQueue_ListConsumers(t *testing.T) { }`) }) - _, err := client.QueueListConsumers(context.Background(), AccountIdentifier(""), "") + _, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(""), ListQueueConsumersParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - _, err = client.QueueListConsumers(context.Background(), AccountIdentifier(testAccountID), "") + _, err = client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - result, err := client.QueueListConsumers(context.Background(), AccountIdentifier(testAccountID), testQueueID) + result, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{QueueName: testQueueName}) if assert.NoError(t, err) { assert.Equal(t, 1, len(result)) assert.Equal(t, testQueueConsumer(), result[0]) @@ -377,20 +377,20 @@ func TestQueue_CreateConsumer(t *testing.T) { }`) }) - _, err := client.QueueCreateConsumer(context.Background(), AccountIdentifier(""), "", QueueConsumer{}) + _, err := client.CreateQueueConsumer(context.Background(), AccountIdentifier(""), CreateQueueConsumerParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - _, err = client.QueueCreateConsumer(context.Background(), AccountIdentifier(testAccountID), "", QueueConsumer{}) + _, err = client.CreateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), CreateQueueConsumerParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - result, err := client.QueueCreateConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, QueueConsumer{ + result, err := client.CreateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), CreateQueueConsumerParams{QueueName: testQueueName, Consumer: QueueConsumer{ Service: "example-consumer", Environment: "production", - }) + }}) if assert.NoError(t, err) { expectedQueueConsumer := testQueueConsumer() expectedQueueConsumer.DeadLetterQueue = "example-dlq" @@ -414,22 +414,22 @@ func TestQueue_DeleteConsumer(t *testing.T) { }`) }) - err := client.QueueDeleteConsumer(context.Background(), AccountIdentifier(""), "", "") + err := client.DeleteQueueConsumer(context.Background(), AccountIdentifier(""), DeleteQueueConsumerParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - err = client.QueueDeleteConsumer(context.Background(), AccountIdentifier(testAccountID), "", "") + err = client.DeleteQueueConsumer(context.Background(), AccountIdentifier(testAccountID), DeleteQueueConsumerParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - err = client.QueueDeleteConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, "") + err = client.DeleteQueueConsumer(context.Background(), AccountIdentifier(testAccountID), DeleteQueueConsumerParams{QueueName: testQueueName}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueConsumerName, err) } - err = client.QueueDeleteConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, testQueueConsumerName) + err = client.DeleteQueueConsumer(context.Background(), AccountIdentifier(testAccountID), DeleteQueueConsumerParams{QueueName: testQueueName, ConsumerName: testQueueConsumerName}) assert.NoError(t, err) } @@ -459,25 +459,26 @@ func TestQueue_UpdateConsumer(t *testing.T) { }`) }) - _, err := client.QueueUpdateConsumer(context.Background(), AccountIdentifier(""), "", "", QueueConsumer{}) + _, err := client.UpdateQueueConsumer(context.Background(), AccountIdentifier(""), UpdateQueueConsumerParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - _, err = client.QueueUpdateConsumer(context.Background(), AccountIdentifier(testAccountID), "", "", QueueConsumer{}) + _, err = client.UpdateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), UpdateQueueConsumerParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - _, err = client.QueueUpdateConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, "", QueueConsumer{}) + _, err = client.UpdateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), UpdateQueueConsumerParams{QueueName: testQueueName}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueConsumerName, err) } - result, err := client.QueueUpdateConsumer(context.Background(), AccountIdentifier(testAccountID), testQueueName, testQueueConsumerName, QueueConsumer{ + result, err := client.UpdateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), UpdateQueueConsumerParams{QueueName: testQueueName, Consumer: QueueConsumer{ + Name: testQueueConsumerName, Service: "example-consumer", Environment: "production", - }) + }}) if assert.NoError(t, err) { assert.Equal(t, testQueueConsumer(), result) } From 21a9f46896fcdb918b5f3a592670f085eedf2358 Mon Sep 17 00:00:00 2001 From: Jacob Bednarz Date: Mon, 21 Nov 2022 12:13:01 +1100 Subject: [PATCH 3/3] add automatic pagination for List* operations --- queue.go | 96 ++++++++++++++++++++++++++++++++++++++------------- queue_test.go | 10 +++--- 2 files changed, 77 insertions(+), 29 deletions(-) diff --git a/queue.go b/queue.go index 0f2332dc9a7..0bd5a4dfa6c 100644 --- a/queue.go +++ b/queue.go @@ -68,7 +68,9 @@ type ListQueueConsumersResponse struct { Result []QueueConsumer `json:"result"` } -type ListQueuesParams struct{} +type ListQueuesParams struct { + ResultInfo +} type QueueConsumerResponse struct { Response @@ -88,6 +90,7 @@ type UpdateQueueParams struct { type ListQueueConsumersParams struct { QueueName string `url:"-"` + ResultInfo } type CreateQueueConsumerParams struct { @@ -107,24 +110,46 @@ type DeleteQueueConsumerParams struct { // ListQueues returns the queues owned by an account. // // API reference: https://api.cloudflare.com/#queue-list-queues -func (api *API) ListQueues(ctx context.Context, rc *ResourceContainer, params ListQueuesParams) ([]Queue, error) { +func (api *API) ListQueues(ctx context.Context, rc *ResourceContainer, params ListQueuesParams) ([]Queue, *ResultInfo, error) { if rc.Identifier == "" { - return []Queue{}, ErrMissingAccountID + return []Queue{}, &ResultInfo{}, ErrMissingAccountID } - uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier) - res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) - if err != nil { - return []Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + autoPaginate := true + if params.PerPage >= 1 || params.Page >= 1 { + autoPaginate = false + } + if params.PerPage < 1 { + params.PerPage = 50 + } + if params.Page < 1 { + params.Page = 1 } - var r QueueListResponse - err = json.Unmarshal(res, &r) - if err != nil { - return []Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + var queues []Queue + var qResponse QueueListResponse + for { + uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier), params) + + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return []Queue{}, &ResultInfo{}, err + } + + err = json.Unmarshal(res, &qResponse) + if err != nil { + return []Queue{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err) + } + + queues = append(queues, qResponse.Result...) + params.ResultInfo = qResponse.ResultInfo.Next() + + if params.ResultInfo.Done() || !autoPaginate { + break + } } - return r.Result, nil + return queues, &qResponse.ResultInfo, nil } // CreateQueue creates a new queue. @@ -228,27 +253,50 @@ func (api *API) UpdateQueue(ctx context.Context, rc *ResourceContainer, params U // ListQueueConsumers returns the consumers of a queue. // // API reference: https://api.cloudflare.com/#queue-list-queue-consumers -func (api *API) ListQueueConsumers(ctx context.Context, rc *ResourceContainer, params ListQueueConsumersParams) ([]QueueConsumer, error) { +func (api *API) ListQueueConsumers(ctx context.Context, rc *ResourceContainer, params ListQueueConsumersParams) ([]QueueConsumer, *ResultInfo, error) { if rc.Identifier == "" { - return []QueueConsumer{}, ErrMissingAccountID + return []QueueConsumer{}, &ResultInfo{}, ErrMissingAccountID } if params.QueueName == "" { - return []QueueConsumer{}, ErrMissingQueueName + return []QueueConsumer{}, &ResultInfo{}, ErrMissingQueueName } - uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName) - res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) - if err != nil { - return []QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + autoPaginate := true + if params.PerPage >= 1 || params.Page >= 1 { + autoPaginate = false + } + if params.PerPage < 1 { + params.PerPage = 50 + } + if params.Page < 1 { + params.Page = 1 } - var r ListQueueConsumersResponse - err = json.Unmarshal(res, &r) - if err != nil { - return []QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + var queuesConsumers []QueueConsumer + var qResponse ListQueueConsumersResponse + for { + uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName), params) + + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return []QueueConsumer{}, &ResultInfo{}, err + } + + err = json.Unmarshal(res, &qResponse) + if err != nil { + return []QueueConsumer{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err) + } + + queuesConsumers = append(queuesConsumers, qResponse.Result...) + params.ResultInfo = qResponse.ResultInfo.Next() + + if params.ResultInfo.Done() || !autoPaginate { + break + } } - return r.Result, nil + + return queuesConsumers, &qResponse.ResultInfo, nil } // CreateQueueConsumer creates a new consumer for a queue. diff --git a/queue_test.go b/queue_test.go index f35c24960b1..5485b354ca8 100644 --- a/queue_test.go +++ b/queue_test.go @@ -110,12 +110,12 @@ func TestQueue_List(t *testing.T) { }`) }) - _, err := client.ListQueues(context.Background(), AccountIdentifier(""), ListQueuesParams{}) + _, _, err := client.ListQueues(context.Background(), AccountIdentifier(""), ListQueuesParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - result, err := client.ListQueues(context.Background(), AccountIdentifier(testAccountID), ListQueuesParams{}) + result, _, err := client.ListQueues(context.Background(), AccountIdentifier(testAccountID), ListQueuesParams{}) if assert.NoError(t, err) { assert.Equal(t, 1, len(result)) assert.Equal(t, testQueue(), result[0]) @@ -333,17 +333,17 @@ func TestQueue_ListConsumers(t *testing.T) { }`) }) - _, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(""), ListQueueConsumersParams{}) + _, _, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(""), ListQueueConsumersParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingAccountID, err) } - _, err = client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{}) + _, _, err = client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{}) if assert.Error(t, err) { assert.Equal(t, ErrMissingQueueName, err) } - result, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{QueueName: testQueueName}) + result, _, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{QueueName: testQueueName}) if assert.NoError(t, err) { assert.Equal(t, 1, len(result)) assert.Equal(t, testQueueConsumer(), result[0])