Skip to content

Commit

Permalink
[tasks] testing: use fixtures for the queue (GoogleCloudPlatform#4049)
Browse files Browse the repository at this point in the history
fixes GoogleCloudPlatform#4045
fixes GoogleCloudPlatform#4044

I don't know why these tests started to fail, but anyways we'd better
use fixtures and temporary queues.
  • Loading branch information
Takashi Matsuo committed Jun 10, 2020
1 parent ec8282c commit 305460d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
24 changes: 22 additions & 2 deletions tasks/create_http_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,35 @@
# limitations under the License.

import os
import uuid

from google.cloud import tasks_v2
import pytest

import create_http_task

TEST_PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
TEST_LOCATION = os.getenv('TEST_QUEUE_LOCATION', 'us-central1')
TEST_QUEUE_NAME = os.getenv('TEST_QUEUE_NAME', 'my-queue')
TEST_QUEUE_NAME = f'my-queue-{uuid.uuid4().hex}'


@pytest.fixture()
def test_queue():
client = tasks_v2.CloudTasksClient()
parent = client.location_path(TEST_PROJECT_ID, TEST_LOCATION)
queue = {
# The fully qualified path to the queue
'name': client.queue_path(
TEST_PROJECT_ID, TEST_LOCATION, TEST_QUEUE_NAME),
}
q = client.create_queue(parent, queue)

yield q

client.delete_queue(q.name)


def test_create_http_task():
def test_create_http_task(test_queue):
url = 'https://example.com/task_handler'
result = create_http_task.create_http_task(
TEST_PROJECT_ID, TEST_QUEUE_NAME, TEST_LOCATION, url)
Expand Down
24 changes: 22 additions & 2 deletions tasks/create_http_task_with_token_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,37 @@
# limitations under the License.

import os
import uuid

from google.cloud import tasks_v2
import pytest

import create_http_task_with_token

TEST_PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')
TEST_LOCATION = os.getenv('TEST_QUEUE_LOCATION', 'us-central1')
TEST_QUEUE_NAME = os.getenv('TEST_QUEUE_NAME', 'my-queue')
TEST_QUEUE_NAME = f'my-queue-{uuid.uuid4().hex}'
TEST_SERVICE_ACCOUNT = (
'test-run-invoker@python-docs-samples-tests.iam.gserviceaccount.com')


def test_create_http_task_with_token():
@pytest.fixture()
def test_queue():
client = tasks_v2.CloudTasksClient()
parent = client.location_path(TEST_PROJECT_ID, TEST_LOCATION)
queue = {
# The fully qualified path to the queue
'name': client.queue_path(
TEST_PROJECT_ID, TEST_LOCATION, TEST_QUEUE_NAME),
}
q = client.create_queue(parent, queue)

yield q

client.delete_queue(q.name)


def test_create_http_task_with_token(test_queue):
url = 'https://example.com/task_handler'
result = create_http_task_with_token.create_http_task(TEST_PROJECT_ID,
TEST_QUEUE_NAME,
Expand Down

0 comments on commit 305460d

Please sign in to comment.