Skip to content

Commit

Permalink
T5281 - Port CC Token (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
wleightond authored Mar 31, 2023
1 parent 0e0ea75 commit 1a75bc4
Show file tree
Hide file tree
Showing 35 changed files with 1,152 additions and 153 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches:
- "T4627_py3_main"
- "T5282_add_mailgun"
- "T5281_port_cc_token"

jobs:
tests:
Expand Down
36 changes: 24 additions & 12 deletions canarytokens/canarydrop.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from datetime import datetime, timedelta
from hashlib import md5
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Literal, Optional, Union

from pydantic import BaseModel, EmailStr, Field, HttpUrl, parse_obj_as, root_validator

Expand Down Expand Up @@ -118,6 +118,17 @@ class Canarydrop(BaseModel):
wg_key: Optional[str]
# cmd specific stuff
cmd_process: Optional[str]
# CC specific stuff
cc_id: Optional[str]
cc_kind: Optional[str]
cc_number: Optional[str]
cc_cvc: Optional[str]
cc_expiration: Optional[str]
cc_name: Optional[str]
cc_billing_zip: Optional[str]
cc_address: Optional[str]
cc_rendered_html: Optional[str]
cc_rendered_csv: Optional[str]

@root_validator(pre=True)
def _validate_triggered_details(cls, values):
Expand Down Expand Up @@ -166,7 +177,7 @@ class Config:
def add_additional_info_to_hit(
self,
hit_time: str,
additional_info: Dict[str, str],
additional_info: dict[str, str],
) -> None:
""" """
trigger_details = queries.get_canarydrop_triggered_details(self.canarytoken)
Expand Down Expand Up @@ -232,11 +243,15 @@ def get_url_components(
queries.get_all_canary_pages(),
)

def generate_random_url(self, canary_domains: List[str]):
def generate_random_url(self, canary_domains: list[str]):
"""
Return a URL generated at random with the saved Canarytoken.
The random URL is also saved into the Canarydrop.
"""
# TODO: check how we want this caching to work. Use a property if needed.
# Or @lru.cache() or as it was but it's non-obvious
if self.generated_url:
return self.generated_url
(path_elements, pages) = self.get_url_components()

generated_url = random.choice(canary_domains) + "/"
Expand All @@ -252,13 +267,11 @@ def generate_random_url(self, canary_domains: List[str]):

path.append(pages[random.randint(0, len(pages) - 1)])
generated_url += "/".join(path)
# TODO: check how we want this caching to work. Use a property if needed.
# Or @lru.cache() or as it was but it's non-obvious
# self.generated_url = generated_url
# self.generated_url
# cache
self.generated_url = generated_url
return generated_url

def get_url(self, canary_domains: List[str]):
def get_url(self, canary_domains: list[str]):
return self.generate_random_url(canary_domains)

def generate_random_hostname(self, with_random=False, nxdomain=False):
Expand All @@ -281,7 +294,6 @@ def generate_random_hostname(self, with_random=False, nxdomain=False):
return generated_hostname

def get_hostname(self, with_random=False, as_url=False, nxdomain=False):

random_hostname = self.generate_random_hostname(
with_random=with_random,
nxdomain=nxdomain,
Expand Down Expand Up @@ -344,7 +356,7 @@ def get_requested_output_channels(
):
"""Return a list containing the output channels configured in this
Canarydrop."""
channels: List[str] = []
channels: list[str] = []
if self.alert_email_enabled and self.alert_email_recipient:
channels.append(OUTPUT_CHANNEL_EMAIL)
if self.alert_webhook_enabled and self.alert_webhook_url:
Expand Down Expand Up @@ -449,12 +461,12 @@ def get_csv_incident_list(self) -> str:

return csvOutput.getvalue()

def format_triggered_details_of_history_page(self) -> Dict[str, Any]:
def format_triggered_details_of_history_page(self) -> dict[str, Any]:
"""
Helper function as history.html still relies on v2 format.
TODO: remove this when history.html is updated.
Returns:
Dict[str, Any]: v2 formatted incident list.
dict[str, Any]: v2 formatted incident list.
"""

return self.triggered_details.serialize_for_v2(readable_time_format=True)
2 changes: 0 additions & 2 deletions canarytokens/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ def format_email_canaryalert(
protocol: str,
host: str, # DESIGN: Shift this to settings. Do we need to have this logic here?
) -> TokenAlertDetails:

details = cls.gather_alert_details(
canarydrop,
protocol=protocol,
Expand Down Expand Up @@ -329,7 +328,6 @@ def send_alert(
canarydrop: Canarydrop,
token_hit: AnyTokenHit,
) -> None:

self.do_send_alert(
input_channel=input_channel,
canarydrop=canarydrop,
Expand Down
19 changes: 15 additions & 4 deletions canarytokens/channel_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# from canarytokens.channel_dns import create_token_hit
from twisted.web import resource, server
from twisted.web.resource import EncodingResourceWrapper, Resource
from twisted.web.server import GzipEncoderFactory
from twisted.web.server import GzipEncoderFactory, Request

from canarytokens import queries
from canarytokens.channel import InputChannel
Expand Down Expand Up @@ -43,7 +43,7 @@ def getChild(self, name, request):
return self
return Resource.getChild(self, name, request)

def render_GET(self, request):
def render_GET(self, request: Request):
# A GET request to a token URL can trigger one of a few responses:
# 1. Check if link has been clicked on (rather than loaded from an
# <img>) by looking at the Accept header, then:
Expand All @@ -54,7 +54,18 @@ def render_GET(self, request):
# 2b. Serve our default 1x1 gif

try:
canarytoken = Canarytoken(value=request.path)
manage_uris = [
b"/generate",
b"/download?",
b"/history?",
b"/manage?",
b"/resources/",
b"/settings",
]
if any([request.path.find(x) >= 0 for x in manage_uris]):
canarytoken = Canarytoken(value=request.path)
else:
canarytoken = Canarytoken(value=request.uri)
except NoCanarytokenFound as e:
log.info(
f"HTTP GET on path {request.path} did not correspond to a token. Error: {e}"
Expand Down Expand Up @@ -98,7 +109,7 @@ def render_GET(self, request):
request.setHeader("Server", "Apache")
return resp

def render_POST(self, request):
def render_POST(self, request: Request):
try:
token = Canarytoken(value=request.path)
except NoCanarytokenFound as e:
Expand Down
1 change: 0 additions & 1 deletion canarytokens/channel_input_smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ def __init__(self, **kwargs):
def greeting(
self,
):

self.src_ip = self.transport.getPeer().host
try:
return self.factory.responses["greeting"]
Expand Down
1 change: 0 additions & 1 deletion canarytokens/channel_output_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ def sendgrid_send(
email_subject: str,
sandbox_mode: bool = False,
) -> tuple[bool, str]:

sendgrid_client = sendgrid.SendGridAPIClient(
api_key=api_key.get_secret_value().strip()
)
Expand Down
2 changes: 0 additions & 2 deletions canarytokens/channel_output_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def do_send_alert(
canarydrop: canarydrop.Canarydrop,
token_hit: AnyTokenHit,
) -> None:

payload = input_channel.format_webhook_canaryalert(
canarydrop=canarydrop,
host=self.frontend_hostname,
Expand All @@ -42,7 +41,6 @@ def generic_webhook_send(
payload: Dict[str, str],
alert_webhook_url: HttpUrl,
) -> None:

# Design: wrap in a retry?
try:
response = requests.post(
Expand Down
14 changes: 14 additions & 0 deletions canarytokens/datagen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from faker import Faker

fake = Faker()


def generate_person() -> dict[str, str]:
address = fake.address()
billing_zip = address.split(" ")[-1]
return {
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"address": address,
"billing_zip": billing_zip,
}
Loading

0 comments on commit 1a75bc4

Please sign in to comment.