diff --git a/autogpt_platform/backend/backend/data/integrations.py b/autogpt_platform/backend/backend/data/integrations.py
index f86ecd3a4e49..4d5197f693a8 100644
--- a/autogpt_platform/backend/backend/data/integrations.py
+++ b/autogpt_platform/backend/backend/data/integrations.py
@@ -144,25 +144,28 @@ class WebhookEventBus(AsyncRedisEventBus[WebhookEvent]):
def event_bus_name(self) -> str:
return "webhooks"
- async def publish(self, event: WebhookEvent):
- await self.publish_event(event, f"{event.webhook_id}/{event.event_type}")
- async def listen(
- self, webhook_id: str, event_type: Optional[str] = None
- ) -> AsyncGenerator[WebhookEvent, None]:
- async for event in self.listen_events(f"{webhook_id}/{event_type or '*'}"):
- yield event
-
-
-event_bus = WebhookEventBus()
+_webhook_event_bus = WebhookEventBus()
async def publish_webhook_event(event: WebhookEvent):
- await event_bus.publish(event)
+ await _webhook_event_bus.publish_event(
+ event, f"{event.webhook_id}/{event.event_type}"
+ )
-async def listen_for_webhook_event(
+async def listen_for_webhook_events(
webhook_id: str, event_type: Optional[str] = None
+) -> AsyncGenerator[WebhookEvent, None]:
+ async for event in _webhook_event_bus.listen_events(
+ f"{webhook_id}/{event_type or '*'}"
+ ):
+ yield event
+
+
+async def wait_for_webhook_event(
+ webhook_id: str, event_type: Optional[str] = None, timeout: Optional[float] = None
) -> WebhookEvent | None:
- async for event in event_bus.listen(webhook_id, event_type):
- return event # Only one event is expected
+ return await _webhook_event_bus.wait_for_event(
+ f"{webhook_id}/{event_type or '*'}", timeout
+ )
diff --git a/autogpt_platform/backend/backend/data/queue.py b/autogpt_platform/backend/backend/data/queue.py
index b6fa72d53046..8fbdb03cd6f0 100644
--- a/autogpt_platform/backend/backend/data/queue.py
+++ b/autogpt_platform/backend/backend/data/queue.py
@@ -1,8 +1,9 @@
+import asyncio
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
-from typing import Any, AsyncGenerator, Generator, Generic, TypeVar
+from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
from pydantic import BaseModel
from redis.asyncio.client import PubSub as AsyncPubSub
@@ -48,12 +49,12 @@ def _deserialize_message(self, msg: Any, channel_key: str) -> M | None:
except Exception as e:
logger.error(f"Failed to parse event result from Redis {msg} {e}")
- def _subscribe(
+ def _get_pubsub_channel(
self, connection: redis.Redis | redis.AsyncRedis, channel_key: str
) -> tuple[PubSub | AsyncPubSub, str]:
- channel_name = f"{self.event_bus_name}/{channel_key}"
+ full_channel_name = f"{self.event_bus_name}/{channel_key}"
pubsub = connection.pubsub()
- return pubsub, channel_name
+ return pubsub, full_channel_name
class RedisEventBus(BaseRedisEventBus[M], ABC):
@@ -64,17 +65,19 @@ def connection(self) -> redis.Redis:
return redis.get_redis()
def publish_event(self, event: M, channel_key: str):
- message, channel_name = self._serialize_message(event, channel_key)
- self.connection.publish(channel_name, message)
+ message, full_channel_name = self._serialize_message(event, channel_key)
+ self.connection.publish(full_channel_name, message)
def listen_events(self, channel_key: str) -> Generator[M, None, None]:
- pubsub, channel_name = self._subscribe(self.connection, channel_key)
+ pubsub, full_channel_name = self._get_pubsub_channel(
+ self.connection, channel_key
+ )
assert isinstance(pubsub, PubSub)
if "*" in channel_key:
- pubsub.psubscribe(channel_name)
+ pubsub.psubscribe(full_channel_name)
else:
- pubsub.subscribe(channel_name)
+ pubsub.subscribe(full_channel_name)
for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
@@ -89,19 +92,31 @@ async def connection(self) -> redis.AsyncRedis:
return await redis.get_redis_async()
async def publish_event(self, event: M, channel_key: str):
- message, channel_name = self._serialize_message(event, channel_key)
+ message, full_channel_name = self._serialize_message(event, channel_key)
connection = await self.connection
- await connection.publish(channel_name, message)
+ await connection.publish(full_channel_name, message)
async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
- pubsub, channel_name = self._subscribe(await self.connection, channel_key)
+ pubsub, full_channel_name = self._get_pubsub_channel(
+ await self.connection, channel_key
+ )
assert isinstance(pubsub, AsyncPubSub)
if "*" in channel_key:
- await pubsub.psubscribe(channel_name)
+ await pubsub.psubscribe(full_channel_name)
else:
- await pubsub.subscribe(channel_name)
+ await pubsub.subscribe(full_channel_name)
async for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
yield event
+
+ async def wait_for_event(
+ self, channel_key: str, timeout: Optional[float] = None
+ ) -> M | None:
+ try:
+ return await asyncio.wait_for(
+ anext(aiter(self.listen_events(channel_key))), timeout
+ )
+ except TimeoutError:
+ return None
diff --git a/autogpt_platform/backend/backend/integrations/webhooks/base.py b/autogpt_platform/backend/backend/integrations/webhooks/base.py
index 61a07ce20353..b1c8a62ddc48 100644
--- a/autogpt_platform/backend/backend/integrations/webhooks/base.py
+++ b/autogpt_platform/backend/backend/integrations/webhooks/base.py
@@ -81,7 +81,9 @@ async def validate_payload(
# --8<-- [end:BaseWebhooksManager3]
# --8<-- [start:BaseWebhooksManager5]
- async def trigger_ping(self, webhook: integrations.Webhook) -> None:
+ async def trigger_ping(
+ self, webhook: integrations.Webhook, credentials: Credentials | None
+ ) -> None:
"""
Triggers a ping to the given webhook.
diff --git a/autogpt_platform/backend/backend/integrations/webhooks/github.py b/autogpt_platform/backend/backend/integrations/webhooks/github.py
index 2393437d209a..679e771246b7 100644
--- a/autogpt_platform/backend/backend/integrations/webhooks/github.py
+++ b/autogpt_platform/backend/backend/integrations/webhooks/github.py
@@ -58,10 +58,15 @@ async def validate_payload(
return payload, event_type
- async def trigger_ping(self, webhook: integrations.Webhook) -> None:
+ async def trigger_ping(
+ self, webhook: integrations.Webhook, credentials: Credentials | None
+ ) -> None:
+ if not credentials:
+ raise ValueError("Credentials are required but were not passed")
+
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
- "Authorization": f"Bearer {webhook.config.get('access_token')}",
+ "Authorization": credentials.bearer(),
}
repo, github_hook_id = webhook.resource, webhook.provider_webhook_id
diff --git a/autogpt_platform/backend/backend/server/integrations/router.py b/autogpt_platform/backend/backend/server/integrations/router.py
index 4220ac3c0311..34770b677d8d 100644
--- a/autogpt_platform/backend/backend/server/integrations/router.py
+++ b/autogpt_platform/backend/backend/server/integrations/router.py
@@ -9,8 +9,8 @@
WebhookEvent,
get_all_webhooks,
get_webhook,
- listen_for_webhook_event,
publish_webhook_event,
+ wait_for_webhook_event,
)
from backend.data.model import (
APIKeyCredentials,
@@ -300,18 +300,28 @@ async def webhook_ingress_generic(
)
-@router.post("/{provider}/webhooks/{webhook_id}/ping")
+@router.post("/webhooks/{webhook_id}/ping")
async def webhook_ping(
- provider: Annotated[str, Path(title="Provider where the webhook was registered")],
webhook_id: Annotated[str, Path(title="Our ID for the webhook")],
user_id: Annotated[str, Depends(get_user_id)], # require auth
):
- webhook_manager = WEBHOOK_MANAGERS_BY_NAME[provider]()
webhook = await get_webhook(webhook_id)
+ webhook_manager = WEBHOOK_MANAGERS_BY_NAME[webhook.provider]()
+
+ credentials = (
+ creds_manager.get(user_id, webhook.credentials_id)
+ if webhook.credentials_id
+ else None
+ )
+ try:
+ await webhook_manager.trigger_ping(webhook, credentials)
+ except NotImplementedError:
+ return False
+
+ if not await wait_for_webhook_event(webhook_id, event_type="ping", timeout=10):
+ raise HTTPException(status_code=504, detail="Webhook ping timed out")
- await webhook_manager.trigger_ping(webhook)
- if not await listen_for_webhook_event(webhook_id, event_type="ping"):
- raise HTTPException(status_code=500, detail="Webhook ping event not received")
+ return True
# --------------------------- UTILITIES ---------------------------- #
diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml
index 2fc7d7f5269e..6d42fbfb7c7b 100644
--- a/autogpt_platform/backend/pyproject.toml
+++ b/autogpt_platform/backend/pyproject.toml
@@ -90,3 +90,6 @@ ignore_patterns = []
[tool.pytest.ini_options]
asyncio_mode = "auto"
+
+[tool.ruff]
+target-version = "py310"
diff --git a/autogpt_platform/frontend/src/app/providers.tsx b/autogpt_platform/frontend/src/app/providers.tsx
index 93db52de71c1..fe469ba0cef2 100644
--- a/autogpt_platform/frontend/src/app/providers.tsx
+++ b/autogpt_platform/frontend/src/app/providers.tsx
@@ -3,6 +3,7 @@
import * as React from "react";
import { ThemeProvider as NextThemesProvider } from "next-themes";
import { ThemeProviderProps } from "next-themes/dist/types";
+import { BackendAPIProvider } from "@/lib/autogpt-server-api";
import { TooltipProvider } from "@/components/ui/tooltip";
import SupabaseProvider from "@/components/SupabaseProvider";
import CredentialsProvider from "@/components/integrations/credentials-provider";
@@ -11,9 +12,11 @@ export function Providers({ children, ...props }: ThemeProviderProps) {
return (