Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blocks): Add webhook block status indicator #8838

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions autogpt_platform/backend/backend/data/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,28 @@ class WebhookEventBus(AsyncRedisEventBus[WebhookEvent]):
def event_bus_name(self) -> str:
return "webhooks"

async def publish(self, event: WebhookEvent):
await self.publish_event(event, f"{event.webhook_id}/{event.event_type}")

async def listen(
self, webhook_id: str, event_type: Optional[str] = None
) -> AsyncGenerator[WebhookEvent, None]:
async for event in self.listen_events(f"{webhook_id}/{event_type or '*'}"):
yield event


event_bus = WebhookEventBus()
_webhook_event_bus = WebhookEventBus()


async def publish_webhook_event(event: WebhookEvent):
await event_bus.publish(event)
await _webhook_event_bus.publish_event(
event, f"{event.webhook_id}/{event.event_type}"
)


async def listen_for_webhook_event(
async def listen_for_webhook_events(
webhook_id: str, event_type: Optional[str] = None
) -> AsyncGenerator[WebhookEvent, None]:
async for event in _webhook_event_bus.listen_events(
f"{webhook_id}/{event_type or '*'}"
):
yield event


async def wait_for_webhook_event(
webhook_id: str, event_type: Optional[str] = None, timeout: Optional[float] = None
) -> WebhookEvent | None:
async for event in event_bus.listen(webhook_id, event_type):
return event # Only one event is expected
return await _webhook_event_bus.wait_for_event(
f"{webhook_id}/{event_type or '*'}", timeout
)
43 changes: 29 additions & 14 deletions autogpt_platform/backend/backend/data/queue.py
ntindle marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, AsyncGenerator, Generator, Generic, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar

from pydantic import BaseModel
from redis.asyncio.client import PubSub as AsyncPubSub
Expand Down Expand Up @@ -48,12 +49,12 @@ def _deserialize_message(self, msg: Any, channel_key: str) -> M | None:
except Exception as e:
logger.error(f"Failed to parse event result from Redis {msg} {e}")

def _subscribe(
def _get_pubsub_channel(
self, connection: redis.Redis | redis.AsyncRedis, channel_key: str
) -> tuple[PubSub | AsyncPubSub, str]:
channel_name = f"{self.event_bus_name}/{channel_key}"
full_channel_name = f"{self.event_bus_name}/{channel_key}"
pubsub = connection.pubsub()
return pubsub, channel_name
return pubsub, full_channel_name


class RedisEventBus(BaseRedisEventBus[M], ABC):
Expand All @@ -64,17 +65,19 @@ def connection(self) -> redis.Redis:
return redis.get_redis()

def publish_event(self, event: M, channel_key: str):
message, channel_name = self._serialize_message(event, channel_key)
self.connection.publish(channel_name, message)
message, full_channel_name = self._serialize_message(event, channel_key)
self.connection.publish(full_channel_name, message)

def listen_events(self, channel_key: str) -> Generator[M, None, None]:
pubsub, channel_name = self._subscribe(self.connection, channel_key)
pubsub, full_channel_name = self._get_pubsub_channel(
self.connection, channel_key
)
assert isinstance(pubsub, PubSub)

if "*" in channel_key:
pubsub.psubscribe(channel_name)
pubsub.psubscribe(full_channel_name)
else:
pubsub.subscribe(channel_name)
pubsub.subscribe(full_channel_name)

for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
Expand All @@ -89,19 +92,31 @@ async def connection(self) -> redis.AsyncRedis:
return await redis.get_redis_async()

async def publish_event(self, event: M, channel_key: str):
message, channel_name = self._serialize_message(event, channel_key)
message, full_channel_name = self._serialize_message(event, channel_key)
connection = await self.connection
await connection.publish(channel_name, message)
await connection.publish(full_channel_name, message)

async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
pubsub, channel_name = self._subscribe(await self.connection, channel_key)
pubsub, full_channel_name = self._get_pubsub_channel(
await self.connection, channel_key
)
assert isinstance(pubsub, AsyncPubSub)

if "*" in channel_key:
await pubsub.psubscribe(channel_name)
await pubsub.psubscribe(full_channel_name)
else:
await pubsub.subscribe(channel_name)
await pubsub.subscribe(full_channel_name)

async for message in pubsub.listen():
if event := self._deserialize_message(message, channel_key):
yield event

async def wait_for_event(
self, channel_key: str, timeout: Optional[float] = None
) -> M | None:
try:
return await asyncio.wait_for(
anext(aiter(self.listen_events(channel_key))), timeout
)
except TimeoutError:
return None
Pwuts marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ async def validate_payload(
# --8<-- [end:BaseWebhooksManager3]

# --8<-- [start:BaseWebhooksManager5]
async def trigger_ping(self, webhook: integrations.Webhook) -> None:
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
"""
Triggers a ping to the given webhook.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ async def validate_payload(

return payload, event_type

async def trigger_ping(self, webhook: integrations.Webhook) -> None:
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
if not credentials:
raise ValueError("Credentials are required but were not passed")

headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": f"Bearer {webhook.config.get('access_token')}",
"Authorization": credentials.bearer(),
}

repo, github_hook_id = webhook.resource, webhook.provider_webhook_id
Expand Down
24 changes: 17 additions & 7 deletions autogpt_platform/backend/backend/server/integrations/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
WebhookEvent,
get_all_webhooks,
get_webhook,
listen_for_webhook_event,
publish_webhook_event,
wait_for_webhook_event,
)
from backend.executor.manager import ExecutionManager
from backend.integrations.creds_manager import IntegrationCredentialsManager
Expand Down Expand Up @@ -300,18 +300,28 @@ async def webhook_ingress_generic(
)


@router.post("/{provider}/webhooks/{webhook_id}/ping")
@router.post("/webhooks/{webhook_id}/ping")
async def webhook_ping(
provider: Annotated[str, Path(title="Provider where the webhook was registered")],
webhook_id: Annotated[str, Path(title="Our ID for the webhook")],
user_id: Annotated[str, Depends(get_user_id)], # require auth
):
webhook_manager = WEBHOOK_MANAGERS_BY_NAME[provider]()
webhook = await get_webhook(webhook_id)
webhook_manager = WEBHOOK_MANAGERS_BY_NAME[webhook.provider]()

credentials = (
creds_manager.get(user_id, webhook.credentials_id)
if webhook.credentials_id
else None
)
try:
await webhook_manager.trigger_ping(webhook, credentials)
except NotImplementedError:
return False

if not await wait_for_webhook_event(webhook_id, event_type="ping", timeout=10):
raise HTTPException(status_code=504, detail="Webhook ping timed out")

await webhook_manager.trigger_ping(webhook)
if not await listen_for_webhook_event(webhook_id, event_type="ping"):
raise HTTPException(status_code=500, detail="Webhook ping event not received")
return True


# --------------------------- UTILITIES ---------------------------- #
Expand Down
3 changes: 3 additions & 0 deletions autogpt_platform/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@ ignore_patterns = []

[tool.pytest.ini_options]
asyncio_mode = "auto"

[tool.ruff]
target-version = "py310"
Pwuts marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 6 additions & 3 deletions autogpt_platform/frontend/src/app/providers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import * as React from "react";
import { ThemeProvider as NextThemesProvider } from "next-themes";
import { ThemeProviderProps } from "next-themes/dist/types";
import { BackendAPIProvider } from "@/lib/autogpt-server-api";
import { TooltipProvider } from "@/components/ui/tooltip";
import SupabaseProvider from "@/components/SupabaseProvider";
import CredentialsProvider from "@/components/integrations/credentials-provider";
Expand All @@ -11,9 +12,11 @@ export function Providers({ children, ...props }: ThemeProviderProps) {
return (
<NextThemesProvider {...props}>
<SupabaseProvider>
<CredentialsProvider>
<TooltipProvider>{children}</TooltipProvider>
</CredentialsProvider>
<BackendAPIProvider>
<CredentialsProvider>
<TooltipProvider>{children}</TooltipProvider>
</CredentialsProvider>
</BackendAPIProvider>
</SupabaseProvider>
</NextThemesProvider>
);
Expand Down
Loading
Loading