diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7fb6cbf1f..76190a11b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,7 +4,7 @@ on: push: branches: - "T4627_py3_main" - - "T5282_add_mailgun" + - "T5281_port_cc_token" jobs: tests: diff --git a/canarytokens/canarydrop.py b/canarytokens/canarydrop.py index 87cdcd7ae..117e232cf 100644 --- a/canarytokens/canarydrop.py +++ b/canarytokens/canarydrop.py @@ -16,7 +16,7 @@ from datetime import datetime, timedelta from hashlib import md5 from pathlib import Path -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Literal, Optional, Union from pydantic import BaseModel, EmailStr, Field, HttpUrl, parse_obj_as, root_validator @@ -118,6 +118,17 @@ class Canarydrop(BaseModel): wg_key: Optional[str] # cmd specific stuff cmd_process: Optional[str] + # CC specific stuff + cc_id: Optional[str] + cc_kind: Optional[str] + cc_number: Optional[str] + cc_cvc: Optional[str] + cc_expiration: Optional[str] + cc_name: Optional[str] + cc_billing_zip: Optional[str] + cc_address: Optional[str] + cc_rendered_html: Optional[str] + cc_rendered_csv: Optional[str] @root_validator(pre=True) def _validate_triggered_details(cls, values): @@ -166,7 +177,7 @@ class Config: def add_additional_info_to_hit( self, hit_time: str, - additional_info: Dict[str, str], + additional_info: dict[str, str], ) -> None: """ """ trigger_details = queries.get_canarydrop_triggered_details(self.canarytoken) @@ -232,11 +243,15 @@ def get_url_components( queries.get_all_canary_pages(), ) - def generate_random_url(self, canary_domains: List[str]): + def generate_random_url(self, canary_domains: list[str]): """ Return a URL generated at random with the saved Canarytoken. The random URL is also saved into the Canarydrop. """ + # TODO: check how we want this caching to work. Use a property if needed. + # Or @lru.cache() or as it was but it's non-obvious + if self.generated_url: + return self.generated_url (path_elements, pages) = self.get_url_components() generated_url = random.choice(canary_domains) + "/" @@ -252,13 +267,11 @@ def generate_random_url(self, canary_domains: List[str]): path.append(pages[random.randint(0, len(pages) - 1)]) generated_url += "/".join(path) - # TODO: check how we want this caching to work. Use a property if needed. - # Or @lru.cache() or as it was but it's non-obvious - # self.generated_url = generated_url - # self.generated_url + # cache + self.generated_url = generated_url return generated_url - def get_url(self, canary_domains: List[str]): + def get_url(self, canary_domains: list[str]): return self.generate_random_url(canary_domains) def generate_random_hostname(self, with_random=False, nxdomain=False): @@ -281,7 +294,6 @@ def generate_random_hostname(self, with_random=False, nxdomain=False): return generated_hostname def get_hostname(self, with_random=False, as_url=False, nxdomain=False): - random_hostname = self.generate_random_hostname( with_random=with_random, nxdomain=nxdomain, @@ -344,7 +356,7 @@ def get_requested_output_channels( ): """Return a list containing the output channels configured in this Canarydrop.""" - channels: List[str] = [] + channels: list[str] = [] if self.alert_email_enabled and self.alert_email_recipient: channels.append(OUTPUT_CHANNEL_EMAIL) if self.alert_webhook_enabled and self.alert_webhook_url: @@ -449,12 +461,12 @@ def get_csv_incident_list(self) -> str: return csvOutput.getvalue() - def format_triggered_details_of_history_page(self) -> Dict[str, Any]: + def format_triggered_details_of_history_page(self) -> dict[str, Any]: """ Helper function as history.html still relies on v2 format. TODO: remove this when history.html is updated. Returns: - Dict[str, Any]: v2 formatted incident list. + dict[str, Any]: v2 formatted incident list. """ return self.triggered_details.serialize_for_v2(readable_time_format=True) diff --git a/canarytokens/channel.py b/canarytokens/channel.py index 24d209c63..dc2ee9ea1 100644 --- a/canarytokens/channel.py +++ b/canarytokens/channel.py @@ -173,7 +173,6 @@ def format_email_canaryalert( protocol: str, host: str, # DESIGN: Shift this to settings. Do we need to have this logic here? ) -> TokenAlertDetails: - details = cls.gather_alert_details( canarydrop, protocol=protocol, @@ -329,7 +328,6 @@ def send_alert( canarydrop: Canarydrop, token_hit: AnyTokenHit, ) -> None: - self.do_send_alert( input_channel=input_channel, canarydrop=canarydrop, diff --git a/canarytokens/channel_http.py b/canarytokens/channel_http.py index 30beb3ea5..3c0700fa5 100644 --- a/canarytokens/channel_http.py +++ b/canarytokens/channel_http.py @@ -8,7 +8,7 @@ # from canarytokens.channel_dns import create_token_hit from twisted.web import resource, server from twisted.web.resource import EncodingResourceWrapper, Resource -from twisted.web.server import GzipEncoderFactory +from twisted.web.server import GzipEncoderFactory, Request from canarytokens import queries from canarytokens.channel import InputChannel @@ -43,7 +43,7 @@ def getChild(self, name, request): return self return Resource.getChild(self, name, request) - def render_GET(self, request): + def render_GET(self, request: Request): # A GET request to a token URL can trigger one of a few responses: # 1. Check if link has been clicked on (rather than loaded from an # ) by looking at the Accept header, then: @@ -54,7 +54,18 @@ def render_GET(self, request): # 2b. Serve our default 1x1 gif try: - canarytoken = Canarytoken(value=request.path) + manage_uris = [ + b"/generate", + b"/download?", + b"/history?", + b"/manage?", + b"/resources/", + b"/settings", + ] + if any([request.path.find(x) >= 0 for x in manage_uris]): + canarytoken = Canarytoken(value=request.path) + else: + canarytoken = Canarytoken(value=request.uri) except NoCanarytokenFound as e: log.info( f"HTTP GET on path {request.path} did not correspond to a token. Error: {e}" @@ -98,7 +109,7 @@ def render_GET(self, request): request.setHeader("Server", "Apache") return resp - def render_POST(self, request): + def render_POST(self, request: Request): try: token = Canarytoken(value=request.path) except NoCanarytokenFound as e: diff --git a/canarytokens/channel_input_smtp.py b/canarytokens/channel_input_smtp.py index f3fdfad20..452b4f080 100644 --- a/canarytokens/channel_input_smtp.py +++ b/canarytokens/channel_input_smtp.py @@ -137,7 +137,6 @@ def __init__(self, **kwargs): def greeting( self, ): - self.src_ip = self.transport.getPeer().host try: return self.factory.responses["greeting"] diff --git a/canarytokens/channel_output_email.py b/canarytokens/channel_output_email.py index 4ddfe9400..d9422fa76 100644 --- a/canarytokens/channel_output_email.py +++ b/canarytokens/channel_output_email.py @@ -225,7 +225,6 @@ def sendgrid_send( email_subject: str, sandbox_mode: bool = False, ) -> tuple[bool, str]: - sendgrid_client = sendgrid.SendGridAPIClient( api_key=api_key.get_secret_value().strip() ) diff --git a/canarytokens/channel_output_webhook.py b/canarytokens/channel_output_webhook.py index 38f066cca..872290a52 100644 --- a/canarytokens/channel_output_webhook.py +++ b/canarytokens/channel_output_webhook.py @@ -25,7 +25,6 @@ def do_send_alert( canarydrop: canarydrop.Canarydrop, token_hit: AnyTokenHit, ) -> None: - payload = input_channel.format_webhook_canaryalert( canarydrop=canarydrop, host=self.frontend_hostname, @@ -42,7 +41,6 @@ def generic_webhook_send( payload: Dict[str, str], alert_webhook_url: HttpUrl, ) -> None: - # Design: wrap in a retry? try: response = requests.post( diff --git a/canarytokens/datagen.py b/canarytokens/datagen.py new file mode 100644 index 000000000..f74e29802 --- /dev/null +++ b/canarytokens/datagen.py @@ -0,0 +1,14 @@ +from faker import Faker + +fake = Faker() + + +def generate_person() -> dict[str, str]: + address = fake.address() + billing_zip = address.split(" ")[-1] + return { + "first_name": fake.first_name(), + "last_name": fake.last_name(), + "address": address, + "billing_zip": billing_zip, + } diff --git a/canarytokens/extendtoken.py b/canarytokens/extendtoken.py new file mode 100644 index 000000000..c3d7f5716 --- /dev/null +++ b/canarytokens/extendtoken.py @@ -0,0 +1,381 @@ +import datetime +import json +import os +from typing import Optional + +import requests + +from canarytokens.datagen import generate_person +from canarytokens.models import ApiProvider, CreditCard + + +class ExtendAPIException(Exception): + pass + + +class ExtendAPIRateLimitException(Exception): + pass + + +class ExtendAPICardsException(Exception): + pass + + +class ExtendAPI(ApiProvider): + """Class for interacting with the Extend API for virtual card management""" + + def __init__( + self, + email, + password, + card_name, + token=None, + ): + self.email = email + self.token = token + self.kind = "AMEX" + self.card_name = card_name + if self.token: + return + + req = self._post_api( + "https://api.paywithextend.com/signin", + {"email": self.email, "password": password}, + ) + self.token = req.json().get("token") + self.refresh_token = req.json().get("refresh_token") + + def _post_api(self, endpoint: str, data: Optional[str] = None) -> requests.Response: + """Performs a POST against the passed endpoint with the data passed""" + headers = { + "Content-Type": "application/json", + "Accept": "application/vnd.paywithextend.v2021-03-12+json", + } + if self.token is not None: + headers["Authorization"] = "Bearer {}".format(self.token) + resp = requests.post(endpoint, json=data, headers=headers) + if resp.status_code == 422: + raise ExtendAPIRateLimitException( + "ExtendAPI call failed with 422 rate limit." + ) + print(resp.content) + try: + json_error, text_error = resp.json().get("error", ""), "" + except requests.exceptions.JSONDecodeError: + json_error, text_error = "", resp.text + if resp.status_code != 200 or json_error: + raise ExtendAPIException( + "ExtendAPI call failed. Response code {}, error={}".format( + resp.status_code, repr(json_error + text_error) + ) + ) + return resp + + def _get_api( + self, endpoint: str, data: Optional[str] = None + ) -> requests.Response: # pragma: no cover + """Performs a GET against the passed endpoint""" + headers = { + "Content-Type": "application/json", + "Accept": "application/vnd.paywithextend.v2021-03-12+json", + } + if self.token is not None: + headers["Authorization"] = "Bearer {}".format(self.token) + resp = requests.get(endpoint, headers=headers, json=data) + if resp.status_code != 200 or resp.json().get("error", "") != "": + raise ExtendAPIException( + "ExtendAPI call failed. Response code {}, error={}".format( + resp.status_code, resp.json().get("error") + ) + ) + return resp + + def _put_api( + self, endpoint: str, data: Optional[str] = None + ) -> requests.Response: # pragma: no cover + """Performs a PUT against the passed endpoint""" + headers = { + "Content-Type": "application/json", + "Accept": "application/vnd.paywithextend.v2021-03-12+json", + } + if self.token is not None: + headers["Authorization"] = f"Bearer {self.token}" + resp = requests.put(endpoint, headers=headers, json=data) + if resp.status_code != 200 or resp.json().get("error", "") != "": + raise ExtendAPIException( + "ExtendAPI call failed. Response code {}, error={}".format( + resp.status_code, resp.json().get("error") + ) + ) + return resp + + def _delete_api(self, endpoint: str): # pragma: no cover + """Performs a DELETE against the passed endpoint""" + headers = { + "Content-Type": "application/json", + "Accept": "application/vnd.paywithextend.v2021-03-12+json", + } + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + resp = requests.delete(endpoint, headers=headers) + if resp.status_code != 200 or resp.json().get("error", "") != "": + raise ExtendAPIException( + "ExtendAPI call failed. Response code {}, error={}".format( + resp.status_code, resp.json().get("error") + ) + ) + return resp + + def _refresh_auth_token(self): # pragma: no cover + """Refreshes the auth session token""" + req = self._post_api( + "https://api.paywithextend.com/renewauth", + {"refreshToken": self.refresh_token}, + ) + self.token = req.json().get("token") + self.refresh_token = req.json().get("refresh_token") + + @classmethod + def fetch_credentials(cls, path=None): # pragma: no cover + if not path: + raise Exception("No path supplied") + + if not os.path.exists(path): + raise Exception(f"File does not exist: {path}") + + with open(path) as f: + credentials = json.loads(f.read().strip()) + + return credentials["EXTEND_EMAIL_ADDRESS"], credentials["EXTEND_API_KEY"] + + def get_virtual_cards(self) -> list[tuple[str, str]]: # pragma: no cover + """Returns a list of tuples of (card owner, card id)""" + req = self._get_api( + "https://api.paywithextend.com/virtualcards?count=50&page=0" + ) + cards = [] + for vc in req.json().get("virtualCards", []): + cards.append( + ( + vc["recipient"]["firstName"] + " " + vc["recipient"]["lastName"], + vc.get("id"), + ) + ) + return cards + + def get_card_info(self, card_id) -> Optional[dict[str, str]]: + """Returns all the data about a passed card_id available""" + req = self._get_api("https://v.paywithextend.com/virtualcards/" + card_id) + return req.json() + + def get_transaction( + self, txn_id: str + ) -> Optional[dict[str, str]]: # pragma: no cover + """Returns more details about a specific transaction""" + req = self._get_api("https://api.paywithextend.com/transactions/" + txn_id) + return req.json() + + def get_card_transactions(self, card_id) -> list[dict]: # pragma: no cover + """Gets all the recent card transactions for a given card_id""" + req = self._get_api( + "https://api.paywithextend.com/virtualcards/{0}/transactions?status=DECLINED,PENDING,CLEARED".format( + card_id + ) + ) + return req.json().get("transactions", []) + + def get_latest_transaction( + self, cc: CreditCard + ) -> Optional[dict[str, str]]: # pragma: no cover + """Gets the latest transaction for a given credit card""" + txns = self.get_card_transactions(cc.id) + if len(txns) == 0: + return None + max_tx = txns[0] + max_dt = datetime.datetime.fromisoformat(max_tx["authedAt"].split("+")[0]) + for txn in txns: + dt = datetime.datetime.fromisoformat(txn["authedAt"].split("+")[0]) + if dt > max_dt: + max_dt = dt + max_tx = txn + return {max_dt.toisoformat(): max_tx} + + def get_parent_card_id(self) -> str: # pragma: no cover + """Gets the ID of the organization's real CC""" + resp = self._get_api("https://api.paywithextend.com/creditcards") + cards = resp.json().get("creditCards") + # import rpdb; rpdb.set_trace() + if len(cards) == 0: + raise ExtendAPICardsException("No cards returned from Extend") + + filtered_cards = [x for x in cards if x["displayName"] == self.card_name] + + if len(filtered_cards) == 0: + raise ExtendAPICardsException("No card is called {}".format(self.card_name)) + + if len(filtered_cards) > 1: + raise ExtendAPICardsException( + "Multiple cards are called {}".format(self.card_name) + ) + + return filtered_cards[0]["id"] + + def make_card( + self, + token_url: str, + first_name: str, + last_name: str, + address: str, + billing_zip: str, + limit_cents: int = 100, + ) -> CreditCard: + """Creates a new CreditCard via Extend's CreateVirtualCard API""" + cc = self.get_parent_card_id() + now_ts = datetime.datetime.now() - datetime.timedelta(days=1) + now_ts = now_ts.isoformat() + "+0000" + expiry = datetime.datetime.now() + datetime.timedelta(weeks=2 * 52) + future_ts = expiry.isoformat() + "+0000" + expiry_str = str(expiry.month) + "/" + str(expiry.year) + notes = token_url + data = { + "creditCardId": cc, + "recipient": self.email, + "displayName": first_name + " " + last_name + "'s card", + "balanceCents": limit_cents, + "direct": "false", + "recurs": "false", + "validFrom": now_ts, + "validTo": future_ts, + "notes": notes, + "referenceFields": [], + "validMccRanges": [{"lowest": "9403", "highest": "9403"}], + } + out = CreditCard( + id="", + name=first_name + " " + last_name, + number=None, + cvc=None, + billing_zip=billing_zip, + expiration=expiry_str, + address=address, + kind=self.kind, + ) + req = self._post_api("https://api.paywithextend.com/virtualcards", data=data) + out.id = req.json().get("virtualCard")["id"] + vc_info = self.get_card_info(out.id) + if "vcn" in vc_info["virtualCard"].keys(): + out.cvc = vc_info["virtualCard"]["securityCode"] + out.number = vc_info["virtualCard"]["vcn"] + else: + print("ERROR GETTING CARD DETAILS") + return out + + def cancel_card(self, card_id) -> None: # pragma: no cover + """Cancels a passed card""" + _ = self._put_api( + "https://api.paywithextend.com/virtualcards/" + card_id + "/cancel" + ) + + def create_credit_card( + self, + token_url: str, + first_name: Optional[str] = None, + last_name: Optional[str] = None, + address: Optional[str] = None, + billing_zip: Optional[str] = None, + ) -> CreditCard: + """Creates a cardholder and associated virtual card for the passed person, if not passed, will generate fake data to use""" + fake_person = generate_person() + if first_name is None: + first_name = fake_person["first_name"] + if last_name is None: + last_name = fake_person["last_name"] + if address is None: + address = fake_person["address"] + if billing_zip is None: + billing_zip = fake_person["billing_zip"] + + cc = self.make_card( + token_url=token_url, + first_name=first_name, + last_name=last_name, + address=address, + billing_zip=billing_zip, + ) + + return cc + + def get_credit_card(self, id: str) -> CreditCard: + """Abstract method to get a virtual credit card""" + pass + + def get_transaction_events( # noqa C901 # pragma: no cover + self, since: Optional[datetime.datetime] = None + ) -> list: + """Returns a list of recent transactions for the org""" + txns = [] + req = self._get_api("https://api.paywithextend.com/events") + for event in req.json().get("events"): + if since is not None: + if since > datetime.datetime.fromisoformat( + event["timestamp"].split("+")[0] + ): + # We've gone far enough back to not need to continue + return txns + if "transaction" in event["type"]: + txns.append(event["data"]) + if req.json().get("pagination")["numberOfPages"] > 1: + page = 1 + while page < req.json().get("pagination")["numberOfPages"]: + req = self._get_api( + "https://api.paywithextend.com/events?page={0}".format(str(page)) + ) + for event in req.json().get("events"): + if since is not None: + if since > datetime.datetime.fromisoformat( + event["timestamp"].split("+")[0] + ): + # We've gone far enough back to not need to continue + return txns + if "transaction" in event["type"]: + txns.append(event["data"]) + page += 1 + return txns + + def subscribe_to_txns(self, url: str): # pragma: no cover + """Adds a subscription to send transaction events to the passed webhook url""" + parent_cc = self.get_parent_card_id() + events = [ + "transaction.authorized", + "transaction.declined", + "transaction.reversed", + "transaction.settled", + "transaction.updated.no_match", + ] + body = {"creditCardId": parent_cc, "enabledEvents": events, "url": url} + req = self._post_api("https://api.paywithextend.com/subscriptions", body) + return req.json() + + def delete_subscription(self, sub_id): # pragma: no cover + self._delete_api("https://api.paywithextend.com/subscriptions/" + sub_id) + + def get_transaction_info_from_event(self, eventid: str): # pragma: no cover + """Returns the virtual card ID from a transaction event""" + res = self._get_api("https://api.paywithextend.com/events/" + eventid) + return res.json().get("event", {}).get("data", {}) + + def issue_test_transaction( + self, cc: CreditCard, amount: int = 1000 + ) -> None: # pragma: no cover + """Issues a test transaction to the passed card""" + return None # This API does not work for "real" cards + data = {"amount": amount, "type": "DECLINED"} + req = self._post_api( + "https://api.paywithextend.com/virtualcards/{0}/transactions/simulate".format( + cc.id + ), + data=data, + ) + if req.status_code != 200: + print("Error issuing test transaction to: " + cc.id) + return req diff --git a/canarytokens/models.py b/canarytokens/models.py index 289ec178c..19ae5e923 100644 --- a/canarytokens/models.py +++ b/canarytokens/models.py @@ -1,4 +1,6 @@ from __future__ import annotations +from abc import ABCMeta, abstractmethod +import csv import enum import json @@ -8,8 +10,9 @@ from dataclasses import dataclass from datetime import datetime from distutils.util import strtobool +from fastapi.responses import JSONResponse from functools import cached_property -from io import BytesIO +from io import BytesIO, StringIO from ipaddress import IPv4Address from tempfile import SpooledTemporaryFile from typing import ( @@ -50,6 +53,19 @@ re.IGNORECASE, ) +response_error = lambda error, message: JSONResponse( # noqa: E731 # lambda is cleaner + { + "error": str(error), + "error_message": message, + "url": "", + "url_components": None, + "token": "", + "email": "", + "hostname": "", + "auth": "", + } +) + class Memo(ConstrainedStr): max_length: int = MEMO_MAX_CHARACTERS @@ -153,6 +169,99 @@ class KubeCerts(TypedDict): k: bytes # Key +class CreditCard(BaseModel): + id: str + number: Optional[str] + cvc: Optional[str] + expiration: Optional[str] + kind: Optional[str] + name: str + billing_zip: str + address: str + + def render_html(self) -> str: + """Returns an HTML div to render the card info on a website""" + # return """