Skip to content

Commit

Permalink
T5211 py3 port google chat webhook formatting (#184)
Browse files Browse the repository at this point in the history
* Add google chat api models

* add google chat webhook formatting

* improve webhook error handling

* update webhook tests

* Delete testdoc.xlsx

---------

Co-authored-by: William Leighton Dawson <leighton@thinkst.com>
  • Loading branch information
emmanuel-thinkst and wleightond committed Jul 18, 2023
1 parent b798873 commit f131f56
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 4 deletions.
49 changes: 48 additions & 1 deletion canarytokens/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
# from canarytokens.exceptions import DuplicateChannel
from canarytokens.models import (
AnyTokenHit,
GoogleChatAlertDetailsSectionData,
GoogleChatCard,
GoogleChatCardV2,
GoogleChatHeader,
GoogleChatSection,
Memo,
SlackAttachment,
SlackField,
TokenAlertDetailGeneric,
TokenAlertDetails,
TokenAlertDetailsGoogleChat,
TokenAlertDetailsSlack,
TokenTypes,
)
Expand Down Expand Up @@ -49,6 +55,40 @@ def format_as_slack_canaryalert(details: TokenAlertDetails) -> TokenAlertDetails
)


def format_as_googlechat_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsGoogleChat:
# construct google chat alert , top section
top_section = GoogleChatSection(header="Alert Details")
top_section.add_widgets(
widgets_info=GoogleChatAlertDetailsSectionData(
channel=details.channel,
time=details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
canarytoken=details.token,
token_reminder=details.memo,
manage_url=details.manage_url,
).get_googlechat_data()
)
# construct google chat alert , additional section
additional_section = GoogleChatSection(header="Additional Details")
additional_section.add_widgets(widgets_info=details.additional_data)

# construct google chat alert card
card = GoogleChatCard(
header=GoogleChatHeader(
title="Canarytoken Triggered",
imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png",
imageType="CIRCLE",
imageAltText="Thinkst Canary",
),
sections=[top_section, additional_section],
)
# make google chat payload
return TokenAlertDetailsGoogleChat(
cardsV2=[GoogleChatCardV2(cardId="unique-card-id", card=card)]
)


class Channel(object):
CHANNEL = "Base"

Expand Down Expand Up @@ -214,8 +254,11 @@ def format_webhook_canaryalert(
canarydrop: Canarydrop,
protocol: str,
host: str, # DESIGN: Shift this to settings. Do we need to have this logic here?
) -> Union[TokenAlertDetailsSlack, TokenAlertDetailGeneric]:
) -> Union[
TokenAlertDetailsSlack, TokenAlertDetailGeneric, TokenAlertDetailsGoogleChat
]:
# TODO: Need to add `host` and `protocol` that can be used to manage the token.
googlechat_hook_base_url = "https://chat.googleapis.com"
details = cls.gather_alert_details(
canarydrop,
protocol=protocol,
Expand All @@ -225,6 +268,10 @@ def format_webhook_canaryalert(
"https://hooks.slack.com" in canarydrop.alert_webhook_url
):
return format_as_slack_canaryalert(details=details)
elif canarydrop.alert_webhook_url and (
str(canarydrop.alert_webhook_url).startswith(googlechat_hook_base_url)
):
return format_as_googlechat_canaryalert(details=details)
else:
return TokenAlertDetailGeneric(**details.dict())

Expand Down
88 changes: 88 additions & 0 deletions canarytokens/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
'4', '5', '6', '7', '8', '9']
# fmt: on
CANARYTOKEN_LENGTH = 25 # equivalent to 128-bit id
CANARYTOKEN_RE = re.compile(
".*([" + "".join(CANARYTOKEN_ALPHABET) + "]{" + str(CANARYTOKEN_LENGTH) + "}).*",
re.IGNORECASE,
)

CANARY_PDF_TEMPLATE_OFFSET: int = 793

Expand All @@ -68,6 +72,11 @@ class Hostname(ConstrainedStr):
)


class Canarytoken(ConstrainedStr):
max_length: int = CANARYTOKEN_LENGTH
regex = CANARYTOKEN_RE


class PseudoUrl(ConstrainedStr):
"""
Clonedweb token can get used where the `referer`
Expand Down Expand Up @@ -1517,6 +1526,85 @@ def __init__(__pydantic_self__, **data: Any) -> None:
super().__init__(**data)


class GoogleChatDecoratedText(BaseModel):
topLabel: str = ""
text: str = ""


class GoogleChatWidget(BaseModel):
decoratedText: GoogleChatDecoratedText


class GoogleChatAlertDetailsSectionData(BaseModel):
channel: str = ""
time: datetime
canarytoken: Canarytoken
token_reminder: Memo
manage_url: HttpUrl

@validator("time", pre=True)
def validate_time(cls, value):
if isinstance(value, str):
return datetime.strptime(value, "%Y-%m-%d %H:%M:%S (UTC)")
return value

def get_googlechat_data(self) -> Dict[str, str]:
data = json_safe_dict(self)
data["Channel"] = data.pop("channel", "")
data["Time"] = data.pop("time", "")
data["Canarytoken"] = data.pop("canarytoken", "")
data["Token Reminder"] = data.pop("token_reminder", "")
data["Manage URL"] = '<a href="{manage_url}">{manage_url}</a>'.format(
manage_url=data.pop("manage_url", "")
)
return data

class Config:
json_encoders = {
datetime: lambda v: v.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
}


class GoogleChatHeader(BaseModel):
title: str = "Canarytoken Triggered"
imageUrl: HttpUrl
imageType: str = "CIRCLE"
imageAltText: str = "Thinkst Canary"


class GoogleChatSection(BaseModel):
header: str = ""
collapsible: bool = False
widgets: List[GoogleChatWidget] = []

def add_widgets(self, widgets_info: Optional[Dict[str, str]] = {}) -> None:
for (label, text) in widgets_info.items():
if not label or not text:
continue
self.widgets.append(
GoogleChatWidget(
decoratedText=GoogleChatDecoratedText(topLabel=label, text=text)
)
)


class GoogleChatCard(BaseModel):
header: GoogleChatHeader
sections: List[GoogleChatSection] = []


class GoogleChatCardV2(BaseModel):
cardId: str = "unique-card-id"
card: GoogleChatCard


class TokenAlertDetailsGoogleChat(BaseModel):
cardsV2: List[GoogleChatCardV2]

def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class TokenAlertDetailsSlack(BaseModel):
"""Details that are sent to slack webhooks."""

Expand Down
24 changes: 22 additions & 2 deletions canarytokens/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,13 @@ def validate_webhook(url, token_type: models.TokenTypes):
url -- Webhook url
"""
slack = "https://hooks.slack.com"
payload: Union[models.TokenAlertDetails, models.TokenAlertDetailsSlack]
if slack in url:
googlechat_hook_base_url = "https://chat.googleapis.com"
payload: Union[
models.TokenAlertDetails,
models.TokenAlertDetailsSlack,
models.TokenAlertDetailsGoogleChat,
]
if url.startswith(slack):
payload = models.TokenAlertDetailsSlack(
attachments=[
models.SlackAttachment(
Expand All @@ -804,6 +809,21 @@ def validate_webhook(url, token_type: models.TokenTypes):
)
]
)
elif url.startswith(googlechat_hook_base_url):
# construct google chat alert card
card = models.GoogleChatCard(
header=models.GoogleChatHeader(
title="Validating new canarytokens webhook",
imageUrl="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png",
imageType="CIRCLE",
imageAltText="Thinkst Canary",
),
sections=[],
)
# make google chat payload
payload = models.TokenAlertDetailsGoogleChat(
cardsV2=[models.GoogleChatCardV2(cardId="unique-card-id", card=card)]
)
else:
payload = models.TokenAlertDetails(
manage_url=HttpUrl(
Expand Down
120 changes: 119 additions & 1 deletion tests/units/test_channel_output_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from twisted.logger import capturedLogs

from canarytokens.canarydrop import Canarydrop
from canarytokens.channel import format_as_googlechat_canaryalert
from canarytokens.channel_dns import ChannelDNS
from canarytokens.channel_output_webhook import WebhookOutputChannel
from canarytokens.models import TokenTypes
from canarytokens.models import TokenAlertDetailsGoogleChat, TokenTypes
from canarytokens.settings import Settings
from canarytokens.switchboard import Switchboard
from canarytokens.tokens import Canarytoken
Expand Down Expand Up @@ -135,3 +136,120 @@ def test_broken_2_webhook(setup_db, webhook_receiver, settings: Settings):
assert any(
["Failed sending request to webhook" in log["log_format"] for log in captured]
)


def test_googlechat_webhook_format(setup_db, webhook_receiver, settings: Settings):

switchboard = Switchboard()
input_channel = ChannelDNS(
switchboard=switchboard,
settings=settings,
backend_hostname="test.com",
backend_scheme="https",
listen_domain=settings.LISTEN_DOMAIN,
)

cd = Canarydrop(
type=TokenTypes.DNS,
generate=True,
alert_email_enabled=False,
alert_email_recipient="email@test.com",
alert_webhook_enabled=False,
alert_webhook_url=webhook_receiver,
canarytoken=Canarytoken(),
memo="memo",
browser_scanner_enabled=False,
)
token_hit = Canarytoken.create_token_hit(
token_type=TokenTypes.DNS,
input_channel="not_valid",
src_ip="127.0.0.1",
hit_info={"some": "data"},
)
cd.add_canarydrop_hit(token_hit=token_hit)
details = input_channel.gather_alert_details(
cd,
protocol=input_channel.backend_scheme,
host=input_channel.backend_hostname,
)
print("Webhook details = {}".format(details))
webhook_payload = format_as_googlechat_canaryalert(details=details)
webhook_payload_json = webhook_payload.json_safe_dict()
print("Webhook_payload json = {}".format(webhook_payload.json()))

assert "cardsV2" in webhook_payload_json.keys()
assert type(webhook_payload_json["cardsV2"]) is list
assert len(webhook_payload_json["cardsV2"]) == 1
for key in ["cardId", "card"]:
assert key in webhook_payload_json["cardsV2"][0].keys()
assert type(webhook_payload_json["cardsV2"][0]["cardId"]) is str
assert type(webhook_payload_json["cardsV2"][0]["card"]) is dict
for key in ["header", "sections"]:
assert key in webhook_payload_json["cardsV2"][0]["card"].keys()
assert type(webhook_payload_json["cardsV2"][0]["card"]["header"]) is dict
for key in ["title", "imageUrl", "imageType", "imageAltText"]:
assert key in webhook_payload_json["cardsV2"][0]["card"]["header"].keys()
assert type(webhook_payload_json["cardsV2"][0]["card"]["header"][key]) is str
assert type(webhook_payload_json["cardsV2"][0]["card"]["sections"]) is list
assert len(webhook_payload_json["cardsV2"][0]["card"]["sections"]) == 2
assert type(webhook_payload_json["cardsV2"][0]["card"]["sections"][0]) is dict
for key in ["header", "collapsible", "widgets"]:
assert key in webhook_payload_json["cardsV2"][0]["card"]["sections"][0].keys()
assert (
type(webhook_payload_json["cardsV2"][0]["card"]["sections"][0]["header"]) is str
)
assert (
type(webhook_payload_json["cardsV2"][0]["card"]["sections"][0]["collapsible"])
is bool
)
assert (
type(webhook_payload_json["cardsV2"][0]["card"]["sections"][0]["widgets"])
is list
)
assert type(webhook_payload_json["cardsV2"][0]["card"]["sections"][1]) is dict
for key in ["header", "collapsible", "widgets"]:
assert key in webhook_payload_json["cardsV2"][0]["card"]["sections"][1].keys()


def test_canaryalert_googlechat_webhook(setup_db, webhook_receiver, settings: Settings):
"""
Tests if a google chat webhook payload, is produced given a googlechat webhook receiver.
"""
googlechat_webhook_receiver = (
"https://chat.googleapis.com/v1/spaces/random/messages?key=temp_key"
)

switchboard = Switchboard()
input_channel = ChannelDNS(
switchboard=switchboard,
settings=settings,
backend_hostname="test.com",
backend_scheme="https",
listen_domain=settings.LISTEN_DOMAIN,
)

cd = Canarydrop(
type=TokenTypes.DNS,
generate=True,
alert_email_enabled=False,
alert_email_recipient="email@test.com",
alert_webhook_enabled=False,
alert_webhook_url=googlechat_webhook_receiver,
canarytoken=Canarytoken(),
memo="memo",
browser_scanner_enabled=False,
)
token_hit = Canarytoken.create_token_hit(
token_type=TokenTypes.DNS,
input_channel="not_valid",
src_ip="127.0.0.1",
hit_info={"some": "data"},
)
cd.add_canarydrop_hit(token_hit=token_hit)

canaryalert_webhook_payload = input_channel.format_webhook_canaryalert(
canarydrop=cd,
protocol=input_channel.backend_scheme,
host=input_channel.backend_hostname,
)
assert isinstance(canaryalert_webhook_payload, TokenAlertDetailsGoogleChat)

0 comments on commit f131f56

Please sign in to comment.