From ae9507a0d86e303b7f861f3c1935d0b28d9e7131 Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Mon, 25 Nov 2024 15:11:09 +0000 Subject: [PATCH 1/4] add `webhookId` to frontend node data model --- autogpt_platform/frontend/src/components/CustomNode.tsx | 1 + autogpt_platform/frontend/src/hooks/useAgentGraph.ts | 1 + autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts | 1 + 3 files changed, 3 insertions(+) diff --git a/autogpt_platform/frontend/src/components/CustomNode.tsx b/autogpt_platform/frontend/src/components/CustomNode.tsx index 01f43e313490..da17bd245898 100644 --- a/autogpt_platform/frontend/src/components/CustomNode.tsx +++ b/autogpt_platform/frontend/src/components/CustomNode.tsx @@ -68,6 +68,7 @@ export type CustomNodeData = { outputSchema: BlockIORootSchema; hardcodedValues: { [key: string]: any }; connections: ConnectionData; + webhookId?: string; isOutputOpen: boolean; status?: NodeExecutionResult["status"]; /** executionResults contains outputs across multiple executions diff --git a/autogpt_platform/frontend/src/hooks/useAgentGraph.ts b/autogpt_platform/frontend/src/hooks/useAgentGraph.ts index 7ffec36e8278..213aa2ac4c43 100644 --- a/autogpt_platform/frontend/src/hooks/useAgentGraph.ts +++ b/autogpt_platform/frontend/src/hooks/useAgentGraph.ts @@ -170,6 +170,7 @@ export default function useAgentGraph( inputSchema: block.inputSchema, outputSchema: block.outputSchema, hardcodedValues: node.input_default, + webhookId: node.webhook_id, uiType: block.uiType, connections: graph.links .filter((l) => [l.source_id, l.sink_id].includes(node.id)) diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts index 59c1b717d38a..1180ee96194c 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts @@ -165,6 +165,7 @@ export type Node = { position: { x: number; y: number }; [key: string]: any; }; + webhook_id?: string; }; /* Mirror of backend/data/graph.py:Link */ From c1b223513c2403dbb4503b24f3932ee44f667350 Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Thu, 28 Nov 2024 23:32:36 +0100 Subject: [PATCH 2/4] Fix webhook ping endpoint - refactor stuff in `backend.data.queue` for understandability - clean up and update `backend.data.integrations` - add `AsyncRedisEventBus.wait_for_event(..)` - add `credentials` parameter to `WebhooksManager.trigger_ping(..)` - fix `GithubWebhooksManager.trigger_ping(..)` - remove `provider` path parameter from webhook ping endpoint - fix webhook ping endpoint return values and error handling --- .../backend/backend/data/integrations.py | 31 +++++++------ .../backend/backend/data/queue.py | 43 +++++++++++++------ .../backend/integrations/webhooks/base.py | 4 +- .../backend/integrations/webhooks/github.py | 9 +++- .../backend/server/integrations/router.py | 24 ++++++++--- autogpt_platform/backend/pyproject.toml | 3 ++ 6 files changed, 76 insertions(+), 38 deletions(-) diff --git a/autogpt_platform/backend/backend/data/integrations.py b/autogpt_platform/backend/backend/data/integrations.py index f86ecd3a4e49..4d5197f693a8 100644 --- a/autogpt_platform/backend/backend/data/integrations.py +++ b/autogpt_platform/backend/backend/data/integrations.py @@ -144,25 +144,28 @@ class WebhookEventBus(AsyncRedisEventBus[WebhookEvent]): def event_bus_name(self) -> str: return "webhooks" - async def publish(self, event: WebhookEvent): - await self.publish_event(event, f"{event.webhook_id}/{event.event_type}") - async def listen( - self, webhook_id: str, event_type: Optional[str] = None - ) -> AsyncGenerator[WebhookEvent, None]: - async for event in self.listen_events(f"{webhook_id}/{event_type or '*'}"): - yield event - - -event_bus = WebhookEventBus() +_webhook_event_bus = WebhookEventBus() async def publish_webhook_event(event: WebhookEvent): - await event_bus.publish(event) + await _webhook_event_bus.publish_event( + event, f"{event.webhook_id}/{event.event_type}" + ) -async def listen_for_webhook_event( +async def listen_for_webhook_events( webhook_id: str, event_type: Optional[str] = None +) -> AsyncGenerator[WebhookEvent, None]: + async for event in _webhook_event_bus.listen_events( + f"{webhook_id}/{event_type or '*'}" + ): + yield event + + +async def wait_for_webhook_event( + webhook_id: str, event_type: Optional[str] = None, timeout: Optional[float] = None ) -> WebhookEvent | None: - async for event in event_bus.listen(webhook_id, event_type): - return event # Only one event is expected + return await _webhook_event_bus.wait_for_event( + f"{webhook_id}/{event_type or '*'}", timeout + ) diff --git a/autogpt_platform/backend/backend/data/queue.py b/autogpt_platform/backend/backend/data/queue.py index b6fa72d53046..8fbdb03cd6f0 100644 --- a/autogpt_platform/backend/backend/data/queue.py +++ b/autogpt_platform/backend/backend/data/queue.py @@ -1,8 +1,9 @@ +import asyncio import json import logging from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, AsyncGenerator, Generator, Generic, TypeVar +from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar from pydantic import BaseModel from redis.asyncio.client import PubSub as AsyncPubSub @@ -48,12 +49,12 @@ def _deserialize_message(self, msg: Any, channel_key: str) -> M | None: except Exception as e: logger.error(f"Failed to parse event result from Redis {msg} {e}") - def _subscribe( + def _get_pubsub_channel( self, connection: redis.Redis | redis.AsyncRedis, channel_key: str ) -> tuple[PubSub | AsyncPubSub, str]: - channel_name = f"{self.event_bus_name}/{channel_key}" + full_channel_name = f"{self.event_bus_name}/{channel_key}" pubsub = connection.pubsub() - return pubsub, channel_name + return pubsub, full_channel_name class RedisEventBus(BaseRedisEventBus[M], ABC): @@ -64,17 +65,19 @@ def connection(self) -> redis.Redis: return redis.get_redis() def publish_event(self, event: M, channel_key: str): - message, channel_name = self._serialize_message(event, channel_key) - self.connection.publish(channel_name, message) + message, full_channel_name = self._serialize_message(event, channel_key) + self.connection.publish(full_channel_name, message) def listen_events(self, channel_key: str) -> Generator[M, None, None]: - pubsub, channel_name = self._subscribe(self.connection, channel_key) + pubsub, full_channel_name = self._get_pubsub_channel( + self.connection, channel_key + ) assert isinstance(pubsub, PubSub) if "*" in channel_key: - pubsub.psubscribe(channel_name) + pubsub.psubscribe(full_channel_name) else: - pubsub.subscribe(channel_name) + pubsub.subscribe(full_channel_name) for message in pubsub.listen(): if event := self._deserialize_message(message, channel_key): @@ -89,19 +92,31 @@ async def connection(self) -> redis.AsyncRedis: return await redis.get_redis_async() async def publish_event(self, event: M, channel_key: str): - message, channel_name = self._serialize_message(event, channel_key) + message, full_channel_name = self._serialize_message(event, channel_key) connection = await self.connection - await connection.publish(channel_name, message) + await connection.publish(full_channel_name, message) async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]: - pubsub, channel_name = self._subscribe(await self.connection, channel_key) + pubsub, full_channel_name = self._get_pubsub_channel( + await self.connection, channel_key + ) assert isinstance(pubsub, AsyncPubSub) if "*" in channel_key: - await pubsub.psubscribe(channel_name) + await pubsub.psubscribe(full_channel_name) else: - await pubsub.subscribe(channel_name) + await pubsub.subscribe(full_channel_name) async for message in pubsub.listen(): if event := self._deserialize_message(message, channel_key): yield event + + async def wait_for_event( + self, channel_key: str, timeout: Optional[float] = None + ) -> M | None: + try: + return await asyncio.wait_for( + anext(aiter(self.listen_events(channel_key))), timeout + ) + except TimeoutError: + return None diff --git a/autogpt_platform/backend/backend/integrations/webhooks/base.py b/autogpt_platform/backend/backend/integrations/webhooks/base.py index b30f419a03e8..03092a94a6fa 100644 --- a/autogpt_platform/backend/backend/integrations/webhooks/base.py +++ b/autogpt_platform/backend/backend/integrations/webhooks/base.py @@ -81,7 +81,9 @@ async def validate_payload( # --8<-- [end:BaseWebhooksManager3] # --8<-- [start:BaseWebhooksManager5] - async def trigger_ping(self, webhook: integrations.Webhook) -> None: + async def trigger_ping( + self, webhook: integrations.Webhook, credentials: Credentials | None + ) -> None: """ Triggers a ping to the given webhook. diff --git a/autogpt_platform/backend/backend/integrations/webhooks/github.py b/autogpt_platform/backend/backend/integrations/webhooks/github.py index 25152caff438..28b5b4eb0da5 100644 --- a/autogpt_platform/backend/backend/integrations/webhooks/github.py +++ b/autogpt_platform/backend/backend/integrations/webhooks/github.py @@ -58,10 +58,15 @@ async def validate_payload( return payload, event_type - async def trigger_ping(self, webhook: integrations.Webhook) -> None: + async def trigger_ping( + self, webhook: integrations.Webhook, credentials: Credentials | None + ) -> None: + if not credentials: + raise ValueError("Credentials are required but were not passed") + headers = { **self.GITHUB_API_DEFAULT_HEADERS, - "Authorization": f"Bearer {webhook.config.get('access_token')}", + "Authorization": credentials.bearer(), } repo, github_hook_id = webhook.resource, webhook.provider_webhook_id diff --git a/autogpt_platform/backend/backend/server/integrations/router.py b/autogpt_platform/backend/backend/server/integrations/router.py index d18ba5c46e40..2d0017bb827b 100644 --- a/autogpt_platform/backend/backend/server/integrations/router.py +++ b/autogpt_platform/backend/backend/server/integrations/router.py @@ -15,8 +15,8 @@ WebhookEvent, get_all_webhooks, get_webhook, - listen_for_webhook_event, publish_webhook_event, + wait_for_webhook_event, ) from backend.executor.manager import ExecutionManager from backend.integrations.creds_manager import IntegrationCredentialsManager @@ -300,18 +300,28 @@ async def webhook_ingress_generic( ) -@router.post("/{provider}/webhooks/{webhook_id}/ping") +@router.post("/webhooks/{webhook_id}/ping") async def webhook_ping( - provider: Annotated[str, Path(title="Provider where the webhook was registered")], webhook_id: Annotated[str, Path(title="Our ID for the webhook")], user_id: Annotated[str, Depends(get_user_id)], # require auth ): - webhook_manager = WEBHOOK_MANAGERS_BY_NAME[provider]() webhook = await get_webhook(webhook_id) + webhook_manager = WEBHOOK_MANAGERS_BY_NAME[webhook.provider]() + + credentials = ( + creds_manager.get(user_id, webhook.credentials_id) + if webhook.credentials_id + else None + ) + try: + await webhook_manager.trigger_ping(webhook, credentials) + except NotImplementedError: + return False + + if not await wait_for_webhook_event(webhook_id, event_type="ping", timeout=10): + raise HTTPException(status_code=504, detail="Webhook ping timed out") - await webhook_manager.trigger_ping(webhook) - if not await listen_for_webhook_event(webhook_id, event_type="ping"): - raise HTTPException(status_code=500, detail="Webhook ping event not received") + return True # --------------------------- UTILITIES ---------------------------- # diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index bcfe089c0755..95aac0c48dad 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -90,3 +90,6 @@ ignore_patterns = [] [tool.pytest.ini_options] asyncio_mode = "auto" + +[tool.ruff] +target-version = "py310" From 325f32e0f28c80d4f98e1026804d5abbc027336c Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Thu, 28 Nov 2024 23:37:24 +0100 Subject: [PATCH 3/4] frontend: add `BackendAPIProvider` + `useBackendAPI` --- .../frontend/src/app/providers.tsx | 9 ++++-- .../src/lib/autogpt-server-api/baseClient.ts | 9 ++++++ .../src/lib/autogpt-server-api/client.ts | 2 +- .../src/lib/autogpt-server-api/context.tsx | 28 +++++++++++++++++++ .../src/lib/autogpt-server-api/index.ts | 4 ++- 5 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 autogpt_platform/frontend/src/lib/autogpt-server-api/context.tsx diff --git a/autogpt_platform/frontend/src/app/providers.tsx b/autogpt_platform/frontend/src/app/providers.tsx index 93db52de71c1..fe469ba0cef2 100644 --- a/autogpt_platform/frontend/src/app/providers.tsx +++ b/autogpt_platform/frontend/src/app/providers.tsx @@ -3,6 +3,7 @@ import * as React from "react"; import { ThemeProvider as NextThemesProvider } from "next-themes"; import { ThemeProviderProps } from "next-themes/dist/types"; +import { BackendAPIProvider } from "@/lib/autogpt-server-api"; import { TooltipProvider } from "@/components/ui/tooltip"; import SupabaseProvider from "@/components/SupabaseProvider"; import CredentialsProvider from "@/components/integrations/credentials-provider"; @@ -11,9 +12,11 @@ export function Providers({ children, ...props }: ThemeProviderProps) { return ( - - {children} - + + + {children} + + ); diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts index c8f25d13ddcd..83d00ee0d783 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts @@ -241,6 +241,15 @@ export default class BaseAutoGPTServerAPI { ); } + /** + * @returns `true` if a ping event was received, `false` if provider doesn't support pinging but the webhook exists. + * @throws `Error` if the webhook does not exist. + * @throws `Error` if the attempt to ping timed out. + */ + async pingWebhook(webhook_id: string): Promise { + return this._request("POST", `/integrations/webhooks/${webhook_id}/ping`); + } + logMetric(metric: AnalyticsMetrics) { return this._request("POST", "/analytics/log_raw_metric", metric); } diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts index f9c1092574af..6d7a4d25c1c1 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts @@ -1,7 +1,7 @@ import { createClient } from "../supabase/client"; import BaseAutoGPTServerAPI from "./baseClient"; -export default class AutoGPTServerAPI extends BaseAutoGPTServerAPI { +export class AutoGPTServerAPI extends BaseAutoGPTServerAPI { constructor( baseUrl: string = process.env.NEXT_PUBLIC_AGPT_SERVER_URL || "http://localhost:8006/api", diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/context.tsx b/autogpt_platform/frontend/src/lib/autogpt-server-api/context.tsx new file mode 100644 index 000000000000..938e226dcd26 --- /dev/null +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/context.tsx @@ -0,0 +1,28 @@ +import { AutoGPTServerAPI } from "./client"; +import React, { createContext, useMemo } from "react"; + +const BackendAPIProviderContext = createContext(null); + +export function BackendAPIProvider({ + children, +}: { + children?: React.ReactNode; +}): React.ReactNode { + const api = useMemo(() => new AutoGPTServerAPI(), []); + + return ( + + {children} + + ); +} + +export function useBackendAPI(): AutoGPTServerAPI { + const context = React.useContext(BackendAPIProviderContext); + if (!context) { + throw new Error( + "useBackendAPI must be used within a BackendAPIProviderContext", + ); + } + return context; +} diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts index 234a72c75061..60047d2f3a5d 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts @@ -1,5 +1,7 @@ -import AutoGPTServerAPI from "./client"; +import { AutoGPTServerAPI } from "./client"; export default AutoGPTServerAPI; +export * from "./client"; +export * from "./context"; export * from "./types"; export * from "./utils"; From 592828a3de519a87df72314b7276e4daf8a3b6de Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Thu, 28 Nov 2024 23:38:01 +0100 Subject: [PATCH 4/4] add webhook status indicator to block - add webhook status indicator dot - fix updating of `node.data.webhookId` when the new value is `undefined` - fix layout of node header --- .../frontend/src/components/CustomNode.tsx | 126 +++++++++++++----- .../frontend/src/hooks/useAgentGraph.ts | 1 + 2 files changed, 94 insertions(+), 33 deletions(-) diff --git a/autogpt_platform/frontend/src/components/CustomNode.tsx b/autogpt_platform/frontend/src/components/CustomNode.tsx index da17bd245898..65ed2cb1e5e0 100644 --- a/autogpt_platform/frontend/src/components/CustomNode.tsx +++ b/autogpt_platform/frontend/src/components/CustomNode.tsx @@ -4,6 +4,7 @@ import React, { useCallback, useRef, useContext, + useMemo, } from "react"; import { NodeProps, useReactFlow, Node, Edge } from "@xyflow/react"; import "@xyflow/react/dist/style.css"; @@ -17,7 +18,8 @@ import { NodeExecutionResult, BlockUIType, BlockCost, -} from "@/lib/autogpt-server-api/types"; + useBackendAPI, +} from "@/lib/autogpt-server-api"; import { beautifyString, cn, @@ -105,6 +107,7 @@ export function CustomNode({ >(); const isInitialSetup = useRef(true); const flowContext = useContext(FlowContext); + const api = useBackendAPI(); let nodeFlowId = ""; if (data.uiType === BlockUIType.AGENT) { @@ -502,6 +505,57 @@ export function CustomNode({ ), ); + const [webhookStatus, setWebhookStatus] = useState< + "works" | "exists" | "broken" | "none" | "pending" | null + >(null); + + useEffect(() => { + if (data.uiType != BlockUIType.WEBHOOK) return; + if (!data.webhookId) { + setWebhookStatus("none"); + return; + } + + setWebhookStatus("pending"); + api + .pingWebhook(data.webhookId) + .then((pinged) => setWebhookStatus(pinged ? "works" : "exists")) + .catch((error: Error) => + error.message.includes("ping timed out") + ? setWebhookStatus("broken") + : setWebhookStatus("none"), + ); + }, [data.uiType, data.webhookId, api, setWebhookStatus]); + + const webhookStatusDot = useMemo( + () => + webhookStatus && ( +
+ ), + [webhookStatus], + ); + const LineSeparator = () => (
@@ -558,55 +612,61 @@ export function CustomNode({ > {/* Header */}
{/* Color Stripe */}
-
-
-
+
+
+

+

+ #{id.split("-")[0]} + +
-
- #{id.split("-")[0]} + {webhookStatusDot} + +
+
+ {blockCost && ( +
+ + {" "} + + {blockCost.cost_amount} + {" "} + credits/{blockCost.cost_type} +
-
+ )} + {data.categories.map((category) => ( + + {beautifyString(category.category.toLowerCase())} + + ))}
- {blockCost && ( -
- - {" "} - {blockCost.cost_amount}{" "} - credits/{blockCost.cost_type} - -
- )}
- {data.categories.map((category) => ( - - {beautifyString(category.category.toLowerCase())} - - ))} -
+ {/* Body */}
{/* Input Handles */} diff --git a/autogpt_platform/frontend/src/hooks/useAgentGraph.ts b/autogpt_platform/frontend/src/hooks/useAgentGraph.ts index 213aa2ac4c43..c8cdb8862f8f 100644 --- a/autogpt_platform/frontend/src/hooks/useAgentGraph.ts +++ b/autogpt_platform/frontend/src/hooks/useAgentGraph.ts @@ -777,6 +777,7 @@ export default function useAgentGraph( ), status: undefined, backend_id: backendNode.id, + webhookId: backendNode.webhook_id, executionResults: [], }, }