Skip to content

Commit

Permalink
🐛 Fix endorser Nats connection going stale (#1122)
Browse files Browse the repository at this point in the history
* add max attempts env

* 🎨

* must be int

* reset to 15

* add temp debug log

* more temp debug logs

* dont reset nats

* revert reset nats

* remove temp debug logging

* add heartbeat to fetch

* import fetch time out error

* on fetch timeout continue

* on generic timeout re-subscribe

* update imports

* fix test

* add tests for health endpoints

* update health endpoints

* add check jetstream func for health checks

* fix test

* 🎨

* update endorser health endpoint

* ✨ Implement DID Exchange and DID Rotate methods and 🗑️ deprecate connections protocol (#1119)

* 🗑️ mark deprecated routes

* 🚧 initial implementation of new did-exchange routes

* 🎨 modify default extra settings example

* 🔧 Update default pylint config

* 🎨 update docstrings and available optional params

* 🚧 e2e test under construction

* ✅ fix up create request tests

* 🐛 fix protocol: /1.1 doesn't work ...

* 🐛 use_public_did must be false in accept-request

* ✅ working tests for accept-request

* 🎨 remove unused endpoint and update route names

* 🎨 update route names

* 🎨 accept reject reason sa body instead of param

* ⬆️ Use latest cloudcontroller

* ⬆️ Update lock files

* ✨ implement did-rotate endpoints

* 🔧 add max-positional-arguments to pylintrc

* ⬆️ use latest cloudcontroller

* ⬆️ Update lock files

* ✨ Use did-rotate/hangup in deletion of connection record (if using didexchange protocol)

* ✅ Fix deleting records when using oob connections

* ✅ assert connections are complete for both parties

* ✅ e2e tests for did-rotate

* ⬆️ Helmfile `0.169`, Helm `3.16.2`, Tailscale `1.76.0` (#1123)

* 📌 Pin `xk6` and plugin versions (#1124)

* `xk6-sse` isn't compatible with `k6>=0.53`
* Pin `xk6` and all plugins to the latest compatible versions
* Also bump Golang to `1.23`

* 📌 Explicitly pin k6 to `v0.52.0` (#1125)

* ⬆️ Update lock files

* 🎨

* ✅ 100% unit test coverage for new did-exchange and did-rotate methods

* ✅ fixed up delete connection test

* 🐛 Reconfigure ACAPY_AUTO_ACCEPT_REQUESTS for Faber after test completes

* ✅ add cleaning up of connection records, for regression fixtures not to get bloated

---------

Co-authored-by: Robbie Blaine <4052340+rblaine95@users.noreply.github.com>

* 💩 add sleep to avoid mysterious 500 error

---------

Co-authored-by: Mourits de Beer <31511766+ff137@users.noreply.github.com>
Co-authored-by: Robbie Blaine <4052340+rblaine95@users.noreply.github.com>
Co-authored-by: ff137 <ff137@proton.me>
  • Loading branch information
4 people authored Oct 17, 2024
1 parent c78bd72 commit c6b887e
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 10 deletions.
5 changes: 4 additions & 1 deletion app/services/onboarding/util/register_issuer_did.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
from logging import Logger

from aries_cloudcontroller import (
Expand All @@ -18,6 +19,8 @@
)
from shared import ACAPY_ENDORSER_ALIAS

MAX_ATTEMPTS = int(os.getenv("WAIT_ISSUER_DID_MAX_ATTEMPTS", "15"))


async def create_connection_with_endorser(
*,
Expand Down Expand Up @@ -241,7 +244,7 @@ async def wait_issuer_did_transaction_endorsed(
issuer_controller: AcaPyClient,
issuer_connection_id: str,
logger: Logger,
max_attempts: int = 15,
max_attempts: int = MAX_ATTEMPTS,
retry_delay: float = 1.0,
) -> None:
attempt = 0
Expand Down
3 changes: 3 additions & 0 deletions app/tests/e2e/test_did_exchange.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Optional

import pytest
Expand Down Expand Up @@ -99,6 +100,7 @@ async def test_create_did_exchange_request(
filter_map={"their_did": alice_did},
)
finally:
await asyncio.sleep(1) # Short sleep assists in avoiding 500 error
# Delete connection records:
await alice_member_client.delete(

Check failure on line 105 in app/tests/e2e/test_did_exchange.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_did_exchange.test_create_did_exchange_request[clean-clean-None-None-False]

fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}
Raw output
self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f0dfadb0>
url = '/v1/connections/ff0a100a-0a03-4c53-94ad-a9d5e863c5d6', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_QWPTB - HTTP DELETE `/v1/connections/ff0a100a-0a03-4c53-94ad-a9d5e863c5d6` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
>               response.raise_for_status()

shared/util/rich_async_client.py:61: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [500 Internal Server Error]>

    def raise_for_status(self) -> Response:
        """
        Raise the `HTTPStatusError` if one occurred.
        """
        request = self._request
        if request is None:
            raise RuntimeError(
                "Cannot call `raise_for_status` as the request "
                "instance has not been set on this response."
            )
    
        if self.is_success:
            return self
    
        if self.has_redirect_location:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "Redirect location: '{0.headers[location]}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
        else:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
    
        status_class = self.status_code // 100
        error_types = {
            1: "Informational response",
            3: "Redirect response",
            4: "Client error",
            5: "Server error",
        }
        error_type = error_types.get(status_class, "Invalid status code")
        message = message.format(self, error_type=error_type)
>       raise HTTPStatusError(message, request=request, response=self)
E       httpx.HTTPStatusError: Server error '500 Internal Server Error' for url 'https://governance-tenant-web.cloudapi.dev.didxtech.com/tenant/v1/connections/ff0a100a-0a03-4c53-94ad-a9d5e863c5d6'
E       For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500

/usr/local/lib/python3.12/site-packages/httpx/_models.py:763: HTTPStatusError

The above exception was the direct cause of the following exception:

alice_member_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f0dfadb0>
faber_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f0dfb260>
alice_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7fe4f0df8380>
faber_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7fe4f0df96a0>
use_did = None, use_did_method = None, use_public_did = False

    @pytest.mark.anyio
    @pytest.mark.parametrize(
        "use_did,use_did_method,use_public_did",
        [
            (None, None, False),
            (True, None, False),
            (None, "did:peer:2", False),
            (None, "did:peer:4", False),
            (True, "did:peer:4", False),
            (None, None, True),
        ],
    )
    async def test_create_did_exchange_request(
        alice_member_client: RichAsyncClient,
        faber_client: RichAsyncClient,
        alice_acapy_client: AcaPyClient,
        faber_acapy_client: AcaPyClient,
        use_did: Optional[str],
        use_did_method: Optional[str],
        use_public_did: bool,
    ):
        faber_public_did = await acapy_wallet.get_public_did(controller=faber_acapy_client)
    
        request_data = {"their_public_did": qualified_did_sov(faber_public_did.did)}
    
        if use_did:
            new_did = await acapy_wallet.create_did(controller=alice_acapy_client)
            request_data["use_did"] = new_did.did
    
        if use_did_method:
            request_data["use_did_method"] = use_did_method
    
        if use_public_did:
            request_data["use_public_did"] = use_public_did
    
        if use_public_did:  # Alice doesn't have a public DID
            with pytest.raises(HTTPException) as exc_info:
                response = await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert exc_info.value.detail == """{"detail":"No public DID configured."}"""
    
        elif use_did and use_did_method:
            with pytest.raises(HTTPException) as exc_info:
                await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert (
                exc_info.value.detail
                == """{"detail":"Cannot specify both use_did and use_did_method."}"""
            )
        else:
            response = await alice_member_client.post(
                f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request", params=request_data
            )
            assert response.status_code == 200
            connection_record = response.json()
            assert_that(connection_record).contains("connection_id", "state")
            assert_that(connection_record["state"]).is_equal_to("request-sent")
    
            alice_connection_id = connection_record["connection_id"]
            alice_did = connection_record["my_did"]
    
            try:
                # Due to auto-accepts, Alice's connection is complete
                assert await check_webhook_state(
                    alice_member_client,
                    topic="connections",
                    state="completed",
                    filter_map={"connection_id": alice_connection_id},
                )
                # Faber now has a complete connection too
                assert await check_webhook_state(
                    faber_client,
                    topic="connections",
                    state="completed",
                    filter_map={"their_did": alice_did},
                )
            finally:
                await asyncio.sleep(1)  # Short sleep assists in avoiding 500 error
                # Delete connection records:
>               await alice_member_client.delete(
                    f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
                )

app/tests/e2e/test_did_exchange.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f0dfadb0>
url = '/v1/connections/ff0a100a-0a03-4c53-94ad-a9d5e863c5d6', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_QWPTB - HTTP DELETE `/v1/connections/ff0a100a-0a03-4c53-94ad-a9d5e863c5d6` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
                response.raise_for_status()
        except HTTPStatusError as e:
            code = e.response.status_code
            message = e.response.text
            log_message = f"{self.name} DELETE `{url}` failed. Status code: {code}. Response: `{message}`."
            logger.error(log_message)
    
>           raise HTTPException(status_code=code, detail=message) from e
E           fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}

shared/util/rich_async_client.py:68: HTTPException

Check failure on line 105 in app/tests/e2e/test_did_exchange.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_did_exchange.test_create_did_exchange_request[clean-clean-None-did:peer:2-False]

fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}
Raw output
self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f199fe60>
url = '/v1/connections/d0efeb84-3728-47a7-848d-51820a31f2c8', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_DJYZH - HTTP DELETE `/v1/connections/d0efeb84-3728-47a7-848d-51820a31f2c8` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
>               response.raise_for_status()

shared/util/rich_async_client.py:61: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [500 Internal Server Error]>

    def raise_for_status(self) -> Response:
        """
        Raise the `HTTPStatusError` if one occurred.
        """
        request = self._request
        if request is None:
            raise RuntimeError(
                "Cannot call `raise_for_status` as the request "
                "instance has not been set on this response."
            )
    
        if self.is_success:
            return self
    
        if self.has_redirect_location:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "Redirect location: '{0.headers[location]}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
        else:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
    
        status_class = self.status_code // 100
        error_types = {
            1: "Informational response",
            3: "Redirect response",
            4: "Client error",
            5: "Server error",
        }
        error_type = error_types.get(status_class, "Invalid status code")
        message = message.format(self, error_type=error_type)
>       raise HTTPStatusError(message, request=request, response=self)
E       httpx.HTTPStatusError: Server error '500 Internal Server Error' for url 'https://governance-tenant-web.cloudapi.dev.didxtech.com/tenant/v1/connections/d0efeb84-3728-47a7-848d-51820a31f2c8'
E       For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500

/usr/local/lib/python3.12/site-packages/httpx/_models.py:763: HTTPStatusError

The above exception was the direct cause of the following exception:

alice_member_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f199fe60>
faber_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f0dfb260>
alice_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7fe4f199ff20>
faber_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7fe4f199ebd0>
use_did = None, use_did_method = 'did:peer:2', use_public_did = False

    @pytest.mark.anyio
    @pytest.mark.parametrize(
        "use_did,use_did_method,use_public_did",
        [
            (None, None, False),
            (True, None, False),
            (None, "did:peer:2", False),
            (None, "did:peer:4", False),
            (True, "did:peer:4", False),
            (None, None, True),
        ],
    )
    async def test_create_did_exchange_request(
        alice_member_client: RichAsyncClient,
        faber_client: RichAsyncClient,
        alice_acapy_client: AcaPyClient,
        faber_acapy_client: AcaPyClient,
        use_did: Optional[str],
        use_did_method: Optional[str],
        use_public_did: bool,
    ):
        faber_public_did = await acapy_wallet.get_public_did(controller=faber_acapy_client)
    
        request_data = {"their_public_did": qualified_did_sov(faber_public_did.did)}
    
        if use_did:
            new_did = await acapy_wallet.create_did(controller=alice_acapy_client)
            request_data["use_did"] = new_did.did
    
        if use_did_method:
            request_data["use_did_method"] = use_did_method
    
        if use_public_did:
            request_data["use_public_did"] = use_public_did
    
        if use_public_did:  # Alice doesn't have a public DID
            with pytest.raises(HTTPException) as exc_info:
                response = await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert exc_info.value.detail == """{"detail":"No public DID configured."}"""
    
        elif use_did and use_did_method:
            with pytest.raises(HTTPException) as exc_info:
                await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert (
                exc_info.value.detail
                == """{"detail":"Cannot specify both use_did and use_did_method."}"""
            )
        else:
            response = await alice_member_client.post(
                f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request", params=request_data
            )
            assert response.status_code == 200
            connection_record = response.json()
            assert_that(connection_record).contains("connection_id", "state")
            assert_that(connection_record["state"]).is_equal_to("request-sent")
    
            alice_connection_id = connection_record["connection_id"]
            alice_did = connection_record["my_did"]
    
            try:
                # Due to auto-accepts, Alice's connection is complete
                assert await check_webhook_state(
                    alice_member_client,
                    topic="connections",
                    state="completed",
                    filter_map={"connection_id": alice_connection_id},
                )
                # Faber now has a complete connection too
                assert await check_webhook_state(
                    faber_client,
                    topic="connections",
                    state="completed",
                    filter_map={"their_did": alice_did},
                )
            finally:
                await asyncio.sleep(1)  # Short sleep assists in avoiding 500 error
                # Delete connection records:
>               await alice_member_client.delete(
                    f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
                )

app/tests/e2e/test_did_exchange.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f199fe60>
url = '/v1/connections/d0efeb84-3728-47a7-848d-51820a31f2c8', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_DJYZH - HTTP DELETE `/v1/connections/d0efeb84-3728-47a7-848d-51820a31f2c8` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
                response.raise_for_status()
        except HTTPStatusError as e:
            code = e.response.status_code
            message = e.response.text
            log_message = f"{self.name} DELETE `{url}` failed. Status code: {code}. Response: `{message}`."
            logger.error(log_message)
    
>           raise HTTPException(status_code=code, detail=message) from e
E           fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}

shared/util/rich_async_client.py:68: HTTPException

Check failure on line 105 in app/tests/e2e/test_did_exchange.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_did_exchange.test_create_did_exchange_request[clean-clean-None-did:peer:4-False]

fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}
Raw output
self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f19ce840>
url = '/v1/connections/3424f5f7-965c-4e81-ae19-28df0afd46c3', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_CWQGA - HTTP DELETE `/v1/connections/3424f5f7-965c-4e81-ae19-28df0afd46c3` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
>               response.raise_for_status()

shared/util/rich_async_client.py:61: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [500 Internal Server Error]>

    def raise_for_status(self) -> Response:
        """
        Raise the `HTTPStatusError` if one occurred.
        """
        request = self._request
        if request is None:
            raise RuntimeError(
                "Cannot call `raise_for_status` as the request "
                "instance has not been set on this response."
            )
    
        if self.is_success:
            return self
    
        if self.has_redirect_location:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "Redirect location: '{0.headers[location]}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
        else:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
    
        status_class = self.status_code // 100
        error_types = {
            1: "Informational response",
            3: "Redirect response",
            4: "Client error",
            5: "Server error",
        }
        error_type = error_types.get(status_class, "Invalid status code")
        message = message.format(self, error_type=error_type)
>       raise HTTPStatusError(message, request=request, response=self)
E       httpx.HTTPStatusError: Server error '500 Internal Server Error' for url 'https://governance-tenant-web.cloudapi.dev.didxtech.com/tenant/v1/connections/3424f5f7-965c-4e81-ae19-28df0afd46c3'
E       For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500

/usr/local/lib/python3.12/site-packages/httpx/_models.py:763: HTTPStatusError

The above exception was the direct cause of the following exception:

alice_member_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f19ce840>
faber_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f0dfb260>
alice_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7fe4f19cc620>
faber_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7fe4f19cda00>
use_did = None, use_did_method = 'did:peer:4', use_public_did = False

    @pytest.mark.anyio
    @pytest.mark.parametrize(
        "use_did,use_did_method,use_public_did",
        [
            (None, None, False),
            (True, None, False),
            (None, "did:peer:2", False),
            (None, "did:peer:4", False),
            (True, "did:peer:4", False),
            (None, None, True),
        ],
    )
    async def test_create_did_exchange_request(
        alice_member_client: RichAsyncClient,
        faber_client: RichAsyncClient,
        alice_acapy_client: AcaPyClient,
        faber_acapy_client: AcaPyClient,
        use_did: Optional[str],
        use_did_method: Optional[str],
        use_public_did: bool,
    ):
        faber_public_did = await acapy_wallet.get_public_did(controller=faber_acapy_client)
    
        request_data = {"their_public_did": qualified_did_sov(faber_public_did.did)}
    
        if use_did:
            new_did = await acapy_wallet.create_did(controller=alice_acapy_client)
            request_data["use_did"] = new_did.did
    
        if use_did_method:
            request_data["use_did_method"] = use_did_method
    
        if use_public_did:
            request_data["use_public_did"] = use_public_did
    
        if use_public_did:  # Alice doesn't have a public DID
            with pytest.raises(HTTPException) as exc_info:
                response = await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert exc_info.value.detail == """{"detail":"No public DID configured."}"""
    
        elif use_did and use_did_method:
            with pytest.raises(HTTPException) as exc_info:
                await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert (
                exc_info.value.detail
                == """{"detail":"Cannot specify both use_did and use_did_method."}"""
            )
        else:
            response = await alice_member_client.post(
                f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request", params=request_data
            )
            assert response.status_code == 200
            connection_record = response.json()
            assert_that(connection_record).contains("connection_id", "state")
            assert_that(connection_record["state"]).is_equal_to("request-sent")
    
            alice_connection_id = connection_record["connection_id"]
            alice_did = connection_record["my_did"]
    
            try:
                # Due to auto-accepts, Alice's connection is complete
                assert await check_webhook_state(
                    alice_member_client,
                    topic="connections",
                    state="completed",
                    filter_map={"connection_id": alice_connection_id},
                )
                # Faber now has a complete connection too
                assert await check_webhook_state(
                    faber_client,
                    topic="connections",
                    state="completed",
                    filter_map={"their_did": alice_did},
                )
            finally:
                await asyncio.sleep(1)  # Short sleep assists in avoiding 500 error
                # Delete connection records:
>               await alice_member_client.delete(
                    f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
                )

app/tests/e2e/test_did_exchange.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f19ce840>
url = '/v1/connections/3424f5f7-965c-4e81-ae19-28df0afd46c3', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_CWQGA - HTTP DELETE `/v1/connections/3424f5f7-965c-4e81-ae19-28df0afd46c3` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
                response.raise_for_status()
        except HTTPStatusError as e:
            code = e.response.status_code
            message = e.response.text
            log_message = f"{self.name} DELETE `{url}` failed. Status code: {code}. Response: `{message}`."
            logger.error(log_message)
    
>           raise HTTPException(status_code=code, detail=message) from e
E           fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}

shared/util/rich_async_client.py:68: HTTPException
f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
Expand Down Expand Up @@ -171,6 +173,7 @@ async def test_accept_did_exchange_invitation(
filter_map={"connection_id": faber_connection_id},
)
finally:
await asyncio.sleep(1) # Short sleep assists in avoiding 500 error
# Delete connection records:
await alice_member_client.delete(

Check failure on line 178 in app/tests/e2e/test_did_exchange.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_did_exchange.test_accept_did_exchange_invitation[clean-clean-False]

fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}
Raw output
self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f19873e0>
url = '/v1/connections/0f009581-c318-471a-bc58-278b3e5654a9', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_UYGDX - HTTP DELETE `/v1/connections/0f009581-c318-471a-bc58-278b3e5654a9` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
>               response.raise_for_status()

shared/util/rich_async_client.py:61: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [500 Internal Server Error]>

    def raise_for_status(self) -> Response:
        """
        Raise the `HTTPStatusError` if one occurred.
        """
        request = self._request
        if request is None:
            raise RuntimeError(
                "Cannot call `raise_for_status` as the request "
                "instance has not been set on this response."
            )
    
        if self.is_success:
            return self
    
        if self.has_redirect_location:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "Redirect location: '{0.headers[location]}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
        else:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
    
        status_class = self.status_code // 100
        error_types = {
            1: "Informational response",
            3: "Redirect response",
            4: "Client error",
            5: "Server error",
        }
        error_type = error_types.get(status_class, "Invalid status code")
        message = message.format(self, error_type=error_type)
>       raise HTTPStatusError(message, request=request, response=self)
E       httpx.HTTPStatusError: Server error '500 Internal Server Error' for url 'https://governance-tenant-web.cloudapi.dev.didxtech.com/tenant/v1/connections/0f009581-c318-471a-bc58-278b3e5654a9'
E       For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500

/usr/local/lib/python3.12/site-packages/httpx/_models.py:763: HTTPStatusError

The above exception was the direct cause of the following exception:

alice_member_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f19873e0>
faber_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f0dfb260>
tenant_admin_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f1985430>
faber_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7fe4f1985cd0>
use_public_did = False

    @pytest.mark.anyio
    @pytest.mark.parametrize("use_public_did", [False])
    async def test_accept_did_exchange_invitation(
        alice_member_client: RichAsyncClient,
        faber_client: RichAsyncClient,
        tenant_admin_client: RichAsyncClient,
        faber_acapy_client: AcaPyClient,
        use_public_did: bool,
    ):
        # First disable auto-accept invites for Faber
        faber_wallet_id = get_wallet_id_from_async_client(client=faber_client)
        await tenant_admin_client.put(
            f"{TENANTS_BASE_PATH}/{faber_wallet_id}",
            json={"extra_settings": {"ACAPY_AUTO_ACCEPT_REQUESTS": False}},
        )
    
        faber_public_did = await acapy_wallet.get_public_did(controller=faber_acapy_client)
    
        request_data = {"their_public_did": qualified_did_sov(faber_public_did.did)}
    
        alice_create_request_response = await alice_member_client.post(
            f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request", params=request_data
        )
        alice_create_request_response = alice_create_request_response.json()
    
        alice_connection_id = alice_create_request_response["connection_id"]
        alice_did = alice_create_request_response["my_did"]
    
        try:
            faber_connection_request_received_event = await check_webhook_state(
                faber_client,
                topic="connections",
                state="request-received",
                filter_map={"their_did": alice_did},
            )
    
            faber_connection_id = faber_connection_request_received_event["connection_id"]
    
            accept_params = {
                "connection_id": faber_connection_id,
                "use_public_did": use_public_did,
            }
    
            faber_accept_request_response = await faber_client.post(
                f"{CONNECTIONS_BASE_PATH}/did-exchange/accept-request", params=accept_params
            )
            assert faber_accept_request_response.status_code == 200
            accept_response = faber_accept_request_response.json()
            assert accept_response["state"] == "response-sent"
    
            # Now Alice's connection is complete
            assert await check_webhook_state(
                alice_member_client,
                topic="connections",
                state="completed",
                filter_map={"connection_id": alice_connection_id},
            )
    
            # And Faber's connection is complete
            assert await check_webhook_state(
                faber_client,
                topic="connections",
                state="completed",
                filter_map={"connection_id": faber_connection_id},
            )
        finally:
            await asyncio.sleep(1)  # Short sleep assists in avoiding 500 error
            # Delete connection records:
>           await alice_member_client.delete(
                f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
            )

app/tests/e2e/test_did_exchange.py:178: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <shared.util.rich_async_client.RichAsyncClient object at 0x7fe4f19873e0>
url = '/v1/connections/0f009581-c318-471a-bc58-278b3e5654a9', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_UYGDX - HTTP DELETE `/v1/connections/0f009581-c318-471a-bc58-278b3e5654a9` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
                response.raise_for_status()
        except HTTPStatusError as e:
            code = e.response.status_code
            message = e.response.text
            log_message = f"{self.name} DELETE `{url}` failed. Status code: {code}. Response: `{message}`."
            logger.error(log_message)
    
>           raise HTTPException(status_code=code, detail=message) from e
E           fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}

shared/util/rich_async_client.py:68: HTTPException
f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ services:
env_file:
- environments/endorser/endorser.env
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3009/health || exit 1"]
test: ["CMD-SHELL", "curl -f http://localhost:3009/health/ready || exit 1"]
interval: 30s
timeout: 10s
retries: 3
Expand Down
33 changes: 32 additions & 1 deletion endorser/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
from contextlib import asynccontextmanager

Expand Down Expand Up @@ -58,7 +59,7 @@ async def scalar_html():
)


@app.get("/health")
@app.get("/health/live")
@inject
async def health_check(
endorsement_processor: EndorsementProcessor = Depends(
Expand All @@ -71,3 +72,33 @@ async def health_check(
raise HTTPException(
status_code=503, detail="One or more background tasks are not running."
)


@app.get("/health/ready")
@inject
async def health_ready(
endorsement_processor: EndorsementProcessor = Depends(
Provide[Container.endorsement_processor]
),
):
try:
jetstream_status = await asyncio.wait_for(
endorsement_processor.check_jetstream(), timeout=5.0
)
except asyncio.TimeoutError:
raise HTTPException(
status_code=503,
detail={"status": "not ready", "error": "JetStream health check timed out"},
)
except Exception as e: # pylint: disable=W0718
raise HTTPException(
status_code=500, detail={"status": "error", "error": str(e)}
)

if jetstream_status["is_working"]:
return {"status": "ready", "jetstream": jetstream_status}
else:
raise HTTPException(
status_code=503,
detail={"status": "not ready", "jetstream": "JetStream not ready"},
)
25 changes: 22 additions & 3 deletions endorser/services/endorsement_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from aries_cloudcontroller import AcaPyClient
from nats.errors import BadSubscriptionError, Error, TimeoutError
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError

from endorser.util.endorsement import accept_endorsement, should_accept_endorsement
from shared.constants import (
Expand Down Expand Up @@ -85,7 +86,7 @@ async def _process_endorsement_requests(self) -> NoReturn:
subscription = await self._subscribe()
while True:
try:
messages = await subscription.fetch(batch=1, timeout=60)
messages = await subscription.fetch(batch=1, timeout=60, heartbeat=1)
for message in messages:
message_subject = message.subject
message_data = message.data.decode()
Expand All @@ -103,9 +104,13 @@ async def _process_endorsement_requests(self) -> NoReturn:
)
finally:
await message.ack()
except TimeoutError:
logger.trace("Timeout fetching messages continuing...")
except FetchTimeoutError:
logger.trace("FetchTimeoutError continuing...")
await asyncio.sleep(0.1)
except TimeoutError as e:
logger.warning("Timeout error fetching messages re-subscribing: {}", e)
await subscription.unsubscribe()
subscription = await self._subscribe()
except Exception: # pylint: disable=W0718
logger.exception("Unexpected error in endorsement processing loop")
await asyncio.sleep(2)
Expand Down Expand Up @@ -192,3 +197,17 @@ async def _subscribe(self) -> JetStreamContext.PullSubscription:
logger.debug("Subscribed to NATS subject")

return subscription

async def check_jetstream(self):
try:
account_info = await self.jetstream.account_info()
is_working = account_info.streams > 0
logger.trace("JetStream check completed. Is working: {}", is_working)
return {
"is_working": is_working,
"streams_count": account_info.streams,
"consumers_count": account_info.consumers,
}
except Exception: # pylint: disable=W0718
logger.exception("Caught exception while checking jetstream status")
return {"is_working": False}
3 changes: 2 additions & 1 deletion endorser/tests/test_endorser_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from nats.aio.client import Client as NATS
from nats.errors import BadSubscriptionError, Error, TimeoutError
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError

from endorser.services.endorsement_processor import EndorsementProcessor
from shared.constants import (
Expand Down Expand Up @@ -128,7 +129,7 @@ async def test_process_endorsement_requests_timeout(
mock_nats_client.pull_subscribe.return_value = mock_subscription

# Simulate a timeout, then a CancelledError to stop the loop
mock_subscription.fetch.side_effect = [TimeoutError, asyncio.CancelledError]
mock_subscription.fetch.side_effect = [FetchTimeoutError, asyncio.CancelledError]

# Test
with patch("asyncio.sleep") as mock_sleep:
Expand Down
95 changes: 92 additions & 3 deletions endorser/tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import pytest
from fastapi import FastAPI, HTTPException

from endorser.main import app, app_lifespan, health_check
from endorser.main import app, app_lifespan, health_check, health_ready
from endorser.services.endorsement_processor import EndorsementProcessor


def test_create_app():
Expand All @@ -14,8 +16,9 @@ def test_create_app():
# Get all routes in app
routes = [route.path for route in app.routes]

expected_routes = "/health"
assert expected_routes in routes
expected_routes = ["/health/live", "/health/ready", "/docs"]
for route in expected_routes:
assert route in routes


@pytest.mark.anyio
Expand Down Expand Up @@ -63,3 +66,89 @@ async def test_health_check_unhealthy():
await health_check(endorsement_processor=endorsement_processor_mock)
assert exc_info.value.status_code == 503
assert exc_info.value.detail == "One or more background tasks are not running."


@pytest.mark.anyio
async def test_health_ready_success():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.return_value = {
"is_working": True,
"streams_count": 1,
"consumers_count": 1,
}

response = await health_ready(endorsement_processor=endorsement_processor_mock)

assert response == {
"status": "ready",
"jetstream": {"is_working": True, "streams_count": 1, "consumers_count": 1},
}


@pytest.mark.anyio
async def test_health_ready_jetstream_not_working():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.return_value = {
"is_working": False,
"error": "No streams available",
}

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 503
assert exc_info.value.detail == {
"status": "not ready",
"jetstream": "JetStream not ready",
}


@pytest.mark.anyio
async def test_health_ready_timeout():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.side_effect = asyncio.TimeoutError()

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 503
assert exc_info.value.detail == {
"status": "not ready",
"error": "JetStream health check timed out",
}


@pytest.mark.anyio
async def test_health_ready_unexpected_error():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.side_effect = Exception(
"Unexpected error"
)

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 500
assert exc_info.value.detail == {"status": "error", "error": "Unexpected error"}


@pytest.mark.anyio
async def test_health_ready_with_timeout():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.side_effect = (
asyncio.TimeoutError
) # Simulate a slow response

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 503
assert exc_info.value.detail == {
"status": "not ready",
"error": "JetStream health check timed out",
}

0 comments on commit c6b887e

Please sign in to comment.