diff --git a/autogpt_platform/backend/backend/data/integrations.py b/autogpt_platform/backend/backend/data/integrations.py index f86ecd3a4e49..4d5197f693a8 100644 --- a/autogpt_platform/backend/backend/data/integrations.py +++ b/autogpt_platform/backend/backend/data/integrations.py @@ -144,25 +144,28 @@ class WebhookEventBus(AsyncRedisEventBus[WebhookEvent]): def event_bus_name(self) -> str: return "webhooks" - async def publish(self, event: WebhookEvent): - await self.publish_event(event, f"{event.webhook_id}/{event.event_type}") - async def listen( - self, webhook_id: str, event_type: Optional[str] = None - ) -> AsyncGenerator[WebhookEvent, None]: - async for event in self.listen_events(f"{webhook_id}/{event_type or '*'}"): - yield event - - -event_bus = WebhookEventBus() +_webhook_event_bus = WebhookEventBus() async def publish_webhook_event(event: WebhookEvent): - await event_bus.publish(event) + await _webhook_event_bus.publish_event( + event, f"{event.webhook_id}/{event.event_type}" + ) -async def listen_for_webhook_event( +async def listen_for_webhook_events( webhook_id: str, event_type: Optional[str] = None +) -> AsyncGenerator[WebhookEvent, None]: + async for event in _webhook_event_bus.listen_events( + f"{webhook_id}/{event_type or '*'}" + ): + yield event + + +async def wait_for_webhook_event( + webhook_id: str, event_type: Optional[str] = None, timeout: Optional[float] = None ) -> WebhookEvent | None: - async for event in event_bus.listen(webhook_id, event_type): - return event # Only one event is expected + return await _webhook_event_bus.wait_for_event( + f"{webhook_id}/{event_type or '*'}", timeout + ) diff --git a/autogpt_platform/backend/backend/data/queue.py b/autogpt_platform/backend/backend/data/queue.py index b6fa72d53046..8fbdb03cd6f0 100644 --- a/autogpt_platform/backend/backend/data/queue.py +++ b/autogpt_platform/backend/backend/data/queue.py @@ -1,8 +1,9 @@ +import asyncio import json import logging from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, AsyncGenerator, Generator, Generic, TypeVar +from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar from pydantic import BaseModel from redis.asyncio.client import PubSub as AsyncPubSub @@ -48,12 +49,12 @@ def _deserialize_message(self, msg: Any, channel_key: str) -> M | None: except Exception as e: logger.error(f"Failed to parse event result from Redis {msg} {e}") - def _subscribe( + def _get_pubsub_channel( self, connection: redis.Redis | redis.AsyncRedis, channel_key: str ) -> tuple[PubSub | AsyncPubSub, str]: - channel_name = f"{self.event_bus_name}/{channel_key}" + full_channel_name = f"{self.event_bus_name}/{channel_key}" pubsub = connection.pubsub() - return pubsub, channel_name + return pubsub, full_channel_name class RedisEventBus(BaseRedisEventBus[M], ABC): @@ -64,17 +65,19 @@ def connection(self) -> redis.Redis: return redis.get_redis() def publish_event(self, event: M, channel_key: str): - message, channel_name = self._serialize_message(event, channel_key) - self.connection.publish(channel_name, message) + message, full_channel_name = self._serialize_message(event, channel_key) + self.connection.publish(full_channel_name, message) def listen_events(self, channel_key: str) -> Generator[M, None, None]: - pubsub, channel_name = self._subscribe(self.connection, channel_key) + pubsub, full_channel_name = self._get_pubsub_channel( + self.connection, channel_key + ) assert isinstance(pubsub, PubSub) if "*" in channel_key: - pubsub.psubscribe(channel_name) + pubsub.psubscribe(full_channel_name) else: - pubsub.subscribe(channel_name) + pubsub.subscribe(full_channel_name) for message in pubsub.listen(): if event := self._deserialize_message(message, channel_key): @@ -89,19 +92,31 @@ async def connection(self) -> redis.AsyncRedis: return await redis.get_redis_async() async def publish_event(self, event: M, channel_key: str): - message, channel_name = self._serialize_message(event, channel_key) + message, full_channel_name = self._serialize_message(event, channel_key) connection = await self.connection - await connection.publish(channel_name, message) + await connection.publish(full_channel_name, message) async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]: - pubsub, channel_name = self._subscribe(await self.connection, channel_key) + pubsub, full_channel_name = self._get_pubsub_channel( + await self.connection, channel_key + ) assert isinstance(pubsub, AsyncPubSub) if "*" in channel_key: - await pubsub.psubscribe(channel_name) + await pubsub.psubscribe(full_channel_name) else: - await pubsub.subscribe(channel_name) + await pubsub.subscribe(full_channel_name) async for message in pubsub.listen(): if event := self._deserialize_message(message, channel_key): yield event + + async def wait_for_event( + self, channel_key: str, timeout: Optional[float] = None + ) -> M | None: + try: + return await asyncio.wait_for( + anext(aiter(self.listen_events(channel_key))), timeout + ) + except TimeoutError: + return None diff --git a/autogpt_platform/backend/backend/integrations/webhooks/base.py b/autogpt_platform/backend/backend/integrations/webhooks/base.py index 61a07ce20353..b1c8a62ddc48 100644 --- a/autogpt_platform/backend/backend/integrations/webhooks/base.py +++ b/autogpt_platform/backend/backend/integrations/webhooks/base.py @@ -81,7 +81,9 @@ async def validate_payload( # --8<-- [end:BaseWebhooksManager3] # --8<-- [start:BaseWebhooksManager5] - async def trigger_ping(self, webhook: integrations.Webhook) -> None: + async def trigger_ping( + self, webhook: integrations.Webhook, credentials: Credentials | None + ) -> None: """ Triggers a ping to the given webhook. diff --git a/autogpt_platform/backend/backend/integrations/webhooks/github.py b/autogpt_platform/backend/backend/integrations/webhooks/github.py index 2393437d209a..679e771246b7 100644 --- a/autogpt_platform/backend/backend/integrations/webhooks/github.py +++ b/autogpt_platform/backend/backend/integrations/webhooks/github.py @@ -58,10 +58,15 @@ async def validate_payload( return payload, event_type - async def trigger_ping(self, webhook: integrations.Webhook) -> None: + async def trigger_ping( + self, webhook: integrations.Webhook, credentials: Credentials | None + ) -> None: + if not credentials: + raise ValueError("Credentials are required but were not passed") + headers = { **self.GITHUB_API_DEFAULT_HEADERS, - "Authorization": f"Bearer {webhook.config.get('access_token')}", + "Authorization": credentials.bearer(), } repo, github_hook_id = webhook.resource, webhook.provider_webhook_id diff --git a/autogpt_platform/backend/backend/server/integrations/router.py b/autogpt_platform/backend/backend/server/integrations/router.py index 4220ac3c0311..34770b677d8d 100644 --- a/autogpt_platform/backend/backend/server/integrations/router.py +++ b/autogpt_platform/backend/backend/server/integrations/router.py @@ -9,8 +9,8 @@ WebhookEvent, get_all_webhooks, get_webhook, - listen_for_webhook_event, publish_webhook_event, + wait_for_webhook_event, ) from backend.data.model import ( APIKeyCredentials, @@ -300,18 +300,28 @@ async def webhook_ingress_generic( ) -@router.post("/{provider}/webhooks/{webhook_id}/ping") +@router.post("/webhooks/{webhook_id}/ping") async def webhook_ping( - provider: Annotated[str, Path(title="Provider where the webhook was registered")], webhook_id: Annotated[str, Path(title="Our ID for the webhook")], user_id: Annotated[str, Depends(get_user_id)], # require auth ): - webhook_manager = WEBHOOK_MANAGERS_BY_NAME[provider]() webhook = await get_webhook(webhook_id) + webhook_manager = WEBHOOK_MANAGERS_BY_NAME[webhook.provider]() + + credentials = ( + creds_manager.get(user_id, webhook.credentials_id) + if webhook.credentials_id + else None + ) + try: + await webhook_manager.trigger_ping(webhook, credentials) + except NotImplementedError: + return False + + if not await wait_for_webhook_event(webhook_id, event_type="ping", timeout=10): + raise HTTPException(status_code=504, detail="Webhook ping timed out") - await webhook_manager.trigger_ping(webhook) - if not await listen_for_webhook_event(webhook_id, event_type="ping"): - raise HTTPException(status_code=500, detail="Webhook ping event not received") + return True # --------------------------- UTILITIES ---------------------------- # diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index 2fc7d7f5269e..6d42fbfb7c7b 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -90,3 +90,6 @@ ignore_patterns = [] [tool.pytest.ini_options] asyncio_mode = "auto" + +[tool.ruff] +target-version = "py310" diff --git a/autogpt_platform/frontend/src/app/providers.tsx b/autogpt_platform/frontend/src/app/providers.tsx index 93db52de71c1..fe469ba0cef2 100644 --- a/autogpt_platform/frontend/src/app/providers.tsx +++ b/autogpt_platform/frontend/src/app/providers.tsx @@ -3,6 +3,7 @@ import * as React from "react"; import { ThemeProvider as NextThemesProvider } from "next-themes"; import { ThemeProviderProps } from "next-themes/dist/types"; +import { BackendAPIProvider } from "@/lib/autogpt-server-api"; import { TooltipProvider } from "@/components/ui/tooltip"; import SupabaseProvider from "@/components/SupabaseProvider"; import CredentialsProvider from "@/components/integrations/credentials-provider"; @@ -11,9 +12,11 @@ export function Providers({ children, ...props }: ThemeProviderProps) { return ( - - {children} - + + + {children} + + ); diff --git a/autogpt_platform/frontend/src/components/CustomNode.tsx b/autogpt_platform/frontend/src/components/CustomNode.tsx index 1c4e8c98183d..9f1f5c86119b 100644 --- a/autogpt_platform/frontend/src/components/CustomNode.tsx +++ b/autogpt_platform/frontend/src/components/CustomNode.tsx @@ -4,6 +4,7 @@ import React, { useCallback, useRef, useContext, + useMemo, } from "react"; import { NodeProps, useReactFlow, Node, Edge } from "@xyflow/react"; import "@xyflow/react/dist/style.css"; @@ -17,7 +18,8 @@ import { NodeExecutionResult, BlockUIType, BlockCost, -} from "@/lib/autogpt-server-api/types"; + useBackendAPI, +} from "@/lib/autogpt-server-api"; import { beautifyString, cn, @@ -68,6 +70,7 @@ export type CustomNodeData = { outputSchema: BlockIORootSchema; hardcodedValues: { [key: string]: any }; connections: ConnectionData; + webhookId?: string; isOutputOpen: boolean; status?: NodeExecutionResult["status"]; /** executionResults contains outputs across multiple executions @@ -104,6 +107,7 @@ export function CustomNode({ >(); const isInitialSetup = useRef(true); const flowContext = useContext(FlowContext); + const api = useBackendAPI(); let nodeFlowId = ""; if (data.uiType === BlockUIType.AGENT) { @@ -513,6 +517,57 @@ export function CustomNode({ isCostFilterMatch(cost.cost_filter, inputValues), ); + const [webhookStatus, setWebhookStatus] = useState< + "works" | "exists" | "broken" | "none" | "pending" | null + >(null); + + useEffect(() => { + if (data.uiType != BlockUIType.WEBHOOK) return; + if (!data.webhookId) { + setWebhookStatus("none"); + return; + } + + setWebhookStatus("pending"); + api + .pingWebhook(data.webhookId) + .then((pinged) => setWebhookStatus(pinged ? "works" : "exists")) + .catch((error: Error) => + error.message.includes("ping timed out") + ? setWebhookStatus("broken") + : setWebhookStatus("none"), + ); + }, [data.uiType, data.webhookId, api, setWebhookStatus]); + + const webhookStatusDot = useMemo( + () => + webhookStatus && ( +
+ ), + [webhookStatus], + ); + const LineSeparator = () => (
@@ -580,55 +635,61 @@ export function CustomNode({ > {/* Header */}
{/* Color Stripe */}
-
-
-
+
+
+

+

+ #{id.split("-")[0]} + +
-
- #{id.split("-")[0]} + {webhookStatusDot} + +
+
+ {blockCost && ( +
+ + {" "} + + {blockCost.cost_amount} + {" "} + credits/{blockCost.cost_type} +
-
+ )} + {data.categories.map((category) => ( + + {beautifyString(category.category.toLowerCase())} + + ))}
- {blockCost && ( -
- - {" "} - {blockCost.cost_amount}{" "} - credits/{blockCost.cost_type} - -
- )}
- {data.categories.map((category) => ( - - {beautifyString(category.category.toLowerCase())} - - ))} -
+ {/* Body */}
{/* Input Handles */} diff --git a/autogpt_platform/frontend/src/hooks/useAgentGraph.ts b/autogpt_platform/frontend/src/hooks/useAgentGraph.ts index 7ffec36e8278..c8cdb8862f8f 100644 --- a/autogpt_platform/frontend/src/hooks/useAgentGraph.ts +++ b/autogpt_platform/frontend/src/hooks/useAgentGraph.ts @@ -170,6 +170,7 @@ export default function useAgentGraph( inputSchema: block.inputSchema, outputSchema: block.outputSchema, hardcodedValues: node.input_default, + webhookId: node.webhook_id, uiType: block.uiType, connections: graph.links .filter((l) => [l.source_id, l.sink_id].includes(node.id)) @@ -776,6 +777,7 @@ export default function useAgentGraph( ), status: undefined, backend_id: backendNode.id, + webhookId: backendNode.webhook_id, executionResults: [], }, } diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts index c8f25d13ddcd..83d00ee0d783 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts @@ -241,6 +241,15 @@ export default class BaseAutoGPTServerAPI { ); } + /** + * @returns `true` if a ping event was received, `false` if provider doesn't support pinging but the webhook exists. + * @throws `Error` if the webhook does not exist. + * @throws `Error` if the attempt to ping timed out. + */ + async pingWebhook(webhook_id: string): Promise { + return this._request("POST", `/integrations/webhooks/${webhook_id}/ping`); + } + logMetric(metric: AnalyticsMetrics) { return this._request("POST", "/analytics/log_raw_metric", metric); } diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts index f9c1092574af..6d7a4d25c1c1 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/client.ts @@ -1,7 +1,7 @@ import { createClient } from "../supabase/client"; import BaseAutoGPTServerAPI from "./baseClient"; -export default class AutoGPTServerAPI extends BaseAutoGPTServerAPI { +export class AutoGPTServerAPI extends BaseAutoGPTServerAPI { constructor( baseUrl: string = process.env.NEXT_PUBLIC_AGPT_SERVER_URL || "http://localhost:8006/api", diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/context.tsx b/autogpt_platform/frontend/src/lib/autogpt-server-api/context.tsx new file mode 100644 index 000000000000..938e226dcd26 --- /dev/null +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/context.tsx @@ -0,0 +1,28 @@ +import { AutoGPTServerAPI } from "./client"; +import React, { createContext, useMemo } from "react"; + +const BackendAPIProviderContext = createContext(null); + +export function BackendAPIProvider({ + children, +}: { + children?: React.ReactNode; +}): React.ReactNode { + const api = useMemo(() => new AutoGPTServerAPI(), []); + + return ( + + {children} + + ); +} + +export function useBackendAPI(): AutoGPTServerAPI { + const context = React.useContext(BackendAPIProviderContext); + if (!context) { + throw new Error( + "useBackendAPI must be used within a BackendAPIProviderContext", + ); + } + return context; +} diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts index 234a72c75061..60047d2f3a5d 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/index.ts @@ -1,5 +1,7 @@ -import AutoGPTServerAPI from "./client"; +import { AutoGPTServerAPI } from "./client"; export default AutoGPTServerAPI; +export * from "./client"; +export * from "./context"; export * from "./types"; export * from "./utils"; diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts index db822ec4ed13..118c8c440a4f 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts @@ -168,6 +168,7 @@ export type Node = { position: { x: number; y: number }; [key: string]: any; }; + webhook_id?: string; }; /* Mirror of backend/data/graph.py:Link */