Skip to content

Commit

Permalink
feat(blocks): Add webhook block status indicator (#8838)
Browse files Browse the repository at this point in the history
- Resolves #8743
- Follow-up to #8358

### Demo


https://github.com/user-attachments/assets/f983dfa2-2dc2-4ab0-8373-e768ba17e6f7

### Changes 🏗️

- feat(frontend): Add webhook status indicator on `CustomNode`
   - Add `webhookId` to frontend node data model

- fix(backend): Fix webhook ping endpoint
   - Remove `provider` path parameter
   - Fix return values and error handling
   - Fix `WebhooksManager.trigger_ping(..)`
      - Add `credentials` parameter
      - Fix usage of credentials
   - Fix `.data.integrations.wait_for_webhook_event(..)`
      - Add `AsyncRedisEventBus.wait_for_event(..)`

- feat(frontend): Add `BackendAPIProvider` + `useBackendAPI`

- feat(frontend): Improve layout of node header

    Before:

![image](https://github.com/user-attachments/assets/17a33b94-65f0-4e34-a47d-2dd321edecae)
    After:

![image](https://github.com/user-attachments/assets/64afb1e4-e3f2-4ca9-8961-f1245f25477f)

- refactor(backend): Clean up `.data.integrations`
- refactor(backend): Fix naming in `.data.queue` for understandability

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Add webhook block, save -> gray indicator
  - [x] Add necessary info to webhook block, save -> green indicator
  - [x] Remove necessary info, save -> gray indicator

---------

Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
  • Loading branch information
Pwuts and ntindle authored Dec 5, 2024
1 parent 6b742d1 commit 64f5e60
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 76 deletions.
31 changes: 17 additions & 14 deletions autogpt_platform/backend/backend/data/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,28 @@ class WebhookEventBus(AsyncRedisEventBus[WebhookEvent]):
def event_bus_name(self) -> str:
return "webhooks"

async def publish(self, event: WebhookEvent):
await self.publish_event(event, f"{event.webhook_id}/{event.event_type}")

async def listen(
self, webhook_id: str, event_type: Optional[str] = None
) -> AsyncGenerator[WebhookEvent, None]:
async for event in self.listen_events(f"{webhook_id}/{event_type or '*'}"):
yield event


event_bus = WebhookEventBus()
_webhook_event_bus = WebhookEventBus()


async def publish_webhook_event(event: WebhookEvent):
await event_bus.publish(event)
await _webhook_event_bus.publish_event(
event, f"{event.webhook_id}/{event.event_type}"
)


async def listen_for_webhook_event(
async def listen_for_webhook_events(
webhook_id: str, event_type: Optional[str] = None
) -> AsyncGenerator[WebhookEvent, None]:
async for event in _webhook_event_bus.listen_events(
f"{webhook_id}/{event_type or '*'}"
):
yield event


async def wait_for_webhook_event(
webhook_id: str, event_type: Optional[str] = None, timeout: Optional[float] = None
) -> WebhookEvent | None:
async for event in event_bus.listen(webhook_id, event_type):
return event # Only one event is expected
return await _webhook_event_bus.wait_for_event(
f"{webhook_id}/{event_type or '*'}", timeout
)
43 changes: 29 additions & 14 deletions autogpt_platform/backend/backend/data/queue.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, AsyncGenerator, Generator, Generic, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar

from pydantic import BaseModel
from redis.asyncio.client import PubSub as AsyncPubSub
Expand Down Expand Up @@ -48,12 +49,12 @@ def _deserialize_message(self, msg: Any, channel_key: str) -> M | None:
except Exception as e:
logger.error(f"Failed to parse event result from Redis {msg} {e}")

def _subscribe(
def _get_pubsub_channel(
self, connection: redis.Redis | redis.AsyncRedis, channel_key: str
) -> tuple[PubSub | AsyncPubSub, str]:
channel_name = f"{self.event_bus_name}/{channel_key}"
full_channel_name = f"{self.event_bus_name}/{channel_key}"
pubsub = connection.pubsub()
return pubsub, channel_name
return pubsub, full_channel_name


class RedisEventBus(BaseRedisEventBus[M], ABC):
Expand All @@ -64,17 +65,19 @@ def connection(self) -> redis.Redis:
return redis.get_redis()

def publish_event(self, event: M, channel_key: str):
message, channel_name = self._serialize_message(event, channel_key)
self.connection.publish(channel_name, message)
message, full_channel_name = self._serialize_message(event, channel_key)
self.connection.publish(full_channel_name, message)

def listen_events(self, channel_key: str) -> Generator[M, None, None]:
pubsub, channel_name = self._subscribe(self.connection, channel_key)
pubsub, full_channel_name = self._get_pubsub_channel(
self.connection, channel_key
)
assert isinstance(pubsub, PubSub)

if "*" in channel_key:
pubsub.psubscribe(channel_name)
pubsub.psubscribe(full_channel_name)
else:
pubsub.subscribe(channel_name)
pubsub.subscribe(full_channel_name)

for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
Expand All @@ -89,19 +92,31 @@ async def connection(self) -> redis.AsyncRedis:
return await redis.get_redis_async()

async def publish_event(self, event: M, channel_key: str):
message, channel_name = self._serialize_message(event, channel_key)
message, full_channel_name = self._serialize_message(event, channel_key)
connection = await self.connection
await connection.publish(channel_name, message)
await connection.publish(full_channel_name, message)

async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
pubsub, channel_name = self._subscribe(await self.connection, channel_key)
pubsub, full_channel_name = self._get_pubsub_channel(
await self.connection, channel_key
)
assert isinstance(pubsub, AsyncPubSub)

if "*" in channel_key:
await pubsub.psubscribe(channel_name)
await pubsub.psubscribe(full_channel_name)
else:
await pubsub.subscribe(channel_name)
await pubsub.subscribe(full_channel_name)

async for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
yield event

async def wait_for_event(
self, channel_key: str, timeout: Optional[float] = None
) -> M | None:
try:
return await asyncio.wait_for(
anext(aiter(self.listen_events(channel_key))), timeout
)
except TimeoutError:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ async def validate_payload(
# --8<-- [end:BaseWebhooksManager3]

# --8<-- [start:BaseWebhooksManager5]
async def trigger_ping(self, webhook: integrations.Webhook) -> None:
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
"""
Triggers a ping to the given webhook.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ async def validate_payload(

return payload, event_type

async def trigger_ping(self, webhook: integrations.Webhook) -> None:
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
if not credentials:
raise ValueError("Credentials are required but were not passed")

headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": f"Bearer {webhook.config.get('access_token')}",
"Authorization": credentials.bearer(),
}

repo, github_hook_id = webhook.resource, webhook.provider_webhook_id
Expand Down
24 changes: 17 additions & 7 deletions autogpt_platform/backend/backend/server/integrations/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
WebhookEvent,
get_all_webhooks,
get_webhook,
listen_for_webhook_event,
publish_webhook_event,
wait_for_webhook_event,
)
from backend.data.model import (
APIKeyCredentials,
Expand Down Expand Up @@ -300,18 +300,28 @@ async def webhook_ingress_generic(
)


@router.post("/{provider}/webhooks/{webhook_id}/ping")
@router.post("/webhooks/{webhook_id}/ping")
async def webhook_ping(
provider: Annotated[str, Path(title="Provider where the webhook was registered")],
webhook_id: Annotated[str, Path(title="Our ID for the webhook")],
user_id: Annotated[str, Depends(get_user_id)], # require auth
):
webhook_manager = WEBHOOK_MANAGERS_BY_NAME[provider]()
webhook = await get_webhook(webhook_id)
webhook_manager = WEBHOOK_MANAGERS_BY_NAME[webhook.provider]()

credentials = (
creds_manager.get(user_id, webhook.credentials_id)
if webhook.credentials_id
else None
)
try:
await webhook_manager.trigger_ping(webhook, credentials)
except NotImplementedError:
return False

if not await wait_for_webhook_event(webhook_id, event_type="ping", timeout=10):
raise HTTPException(status_code=504, detail="Webhook ping timed out")

await webhook_manager.trigger_ping(webhook)
if not await listen_for_webhook_event(webhook_id, event_type="ping"):
raise HTTPException(status_code=500, detail="Webhook ping event not received")
return True


# --------------------------- UTILITIES ---------------------------- #
Expand Down
3 changes: 3 additions & 0 deletions autogpt_platform/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@ ignore_patterns = []

[tool.pytest.ini_options]
asyncio_mode = "auto"

[tool.ruff]
target-version = "py310"
9 changes: 6 additions & 3 deletions autogpt_platform/frontend/src/app/providers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import * as React from "react";
import { ThemeProvider as NextThemesProvider } from "next-themes";
import { ThemeProviderProps } from "next-themes/dist/types";
import { BackendAPIProvider } from "@/lib/autogpt-server-api";
import { TooltipProvider } from "@/components/ui/tooltip";
import SupabaseProvider from "@/components/SupabaseProvider";
import CredentialsProvider from "@/components/integrations/credentials-provider";
Expand All @@ -11,9 +12,11 @@ export function Providers({ children, ...props }: ThemeProviderProps) {
return (
<NextThemesProvider {...props}>
<SupabaseProvider>
<CredentialsProvider>
<TooltipProvider>{children}</TooltipProvider>
</CredentialsProvider>
<BackendAPIProvider>
<CredentialsProvider>
<TooltipProvider>{children}</TooltipProvider>
</CredentialsProvider>
</BackendAPIProvider>
</SupabaseProvider>
</NextThemesProvider>
);
Expand Down
Loading

0 comments on commit 64f5e60

Please sign in to comment.