Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #20 from dbluhm/feature/outbound-message-changes
Browse files Browse the repository at this point in the history
feat: outbound message in handle message
  • Loading branch information
swcurran authored Jul 5, 2023
2 parents a27cefe + 7f9cd7a commit c9f0ae6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ Covered in ./docker/README.md

First, install this plugin into your environment:

> **Note** Deployments of the main branch of this repository must be used with ACA-Py artifacts created **after** [ACA-Py PR #2170](https://github.com/hyperledger/aries-cloudagent-python/pull/2170)). If you are using an earlier ACA-Py release (e.g., version 0.8.2 and earlier), you **MUST** use the v0.0.1 tag of this repository.
```sh
$ pip install git+https://github.com/bcgov/aries-acapy-plugin-redis-events.git
```
Expand Down
19 changes: 13 additions & 6 deletions redis_queue/v1_0/outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import json

import logging
from typing import List, Optional, Union

from aries_cloudagent.transport.wire_format import BaseWireFormat
from aries_cloudagent.core.profile import Profile
from aries_cloudagent.transport.outbound.base import (
BaseOutboundTransport,
OutboundTransportError,
QueuedOutboundMessage,
)
from aries_cloudagent.transport.wire_format import (
DIDCOMM_V0_MIME_TYPE,
Expand All @@ -33,15 +34,20 @@ class RedisOutboundQueue(BaseOutboundTransport):
schemes = ("redis",)
is_external = True

def __init__(self, root_profile: Profile):
def __init__(
self,
wire_format: BaseWireFormat,
root_profile: Profile,
):
"""Initialize base queue type."""
super().__init__(root_profile)
super().__init__(wire_format, root_profile)
self.outbound_config = (
get_config(root_profile.context.settings).outbound
or OutboundConfig.default()
)
LOGGER.info(
f"Setting up redis outbound queue with configuration: {self.outbound_config}"
"Setting up redis outbound queue with configuration: %s",
self.outbound_config,
)
self.redis = root_profile.inject_or(RedisCluster)
self.is_mediator = self.outbound_config.mediator_mode
Expand All @@ -63,12 +69,13 @@ async def stop(self):
async def handle_message(
self,
profile: Profile,
payload: Union[str, bytes],
outbound_message: QueuedOutboundMessage,
endpoint: str,
metadata: dict = None,
api_key: str = None,
):
"""Prepare and send message to external queue."""
payload = outbound_message.payload
if not endpoint:
raise OutboundTransportError("No endpoint provided")
headers = metadata or {}
Expand Down Expand Up @@ -105,4 +112,4 @@ async def handle_message(
message,
)
except (RedisError, RedisClusterException) as err:
LOGGER.exception(f"Error while pushing to Redis: {err}")
LOGGER.exception("Error while pushing to Redis: %s", err)

0 comments on commit c9f0ae6

Please sign in to comment.