diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 76190a11b..fc0a9c224 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,7 +4,7 @@ on: push: branches: - "T4627_py3_main" - - "T5281_port_cc_token" + - "T5285_port_azure_token" jobs: tests: @@ -111,7 +111,7 @@ jobs: export CANARY_MAILGUN_API_KEY=${{ secrets.TESTING_MAILGUN_API_KEY }} export CANARY_SENTRY_ENVIRONMENT=ci cd tests - poetry run coverage run --source=../canarytokens --omit="integration/test_custom_binary.py,integration/test_sql_server_token.py" -m pytest units --runv3 + poetry run coverage run --source=../canarytokens --omit="integration/test_custom_binary.py,integration/test_sql_server_token.py" -m pytest units --runv3 -v - name: Check coverage is over threshold percentage for unit tests # Here we check coverage info on all tests @@ -128,13 +128,13 @@ jobs: sleep 10 export TEST_NETWORK=`docker network ls | grep git | python -c "from sys import stdin; print(stdin.read().split()[1])"` export TEST_HOST=`docker network inspect $TEST_NETWORK | jq '.[0].IPAM.Config[0].Gateway' | sed 's/"//g'` - LIVE=False poetry run coverage run --source=../canarytokens --omit=integration/test_custom_binary.py -m pytest integration --runv3 + LIVE=False poetry run coverage run --source=../canarytokens --omit=integration/test_custom_binary.py -m pytest integration --runv3 -v - name: Run integration tests (against V2) # Here we gather coverage info on integration tests. run: | cd tests - LIVE=True poetry run coverage run --source=integration --omit=integration/test_custom_binary.py -m pytest integration --runv2 + LIVE=True poetry run coverage run --source=integration --omit=integration/test_custom_binary.py -m pytest integration --runv2 -v - name: Check coverage is over threshold percentage for integration tests # Here we check coverage info that all integration tests where indeed run. diff --git a/canarytokens/authenticode.py b/canarytokens/authenticode.py index 0be1c62a4..187aedcd8 100644 --- a/canarytokens/authenticode.py +++ b/canarytokens/authenticode.py @@ -4,7 +4,9 @@ from canarytokens.sign_file import authenticode_sign_binary -def make_canary_authenticode_binary(nxdomain_token_url: str, filebody: bytes) -> bytes: +def make_canary_authenticode_binary( + nxdomain_token_url: str, filebody: bytes +) -> bytes: # pragma: no cover """Takes in a nxdomain url (eg: http://{token}.nxdomain.tools) and bytes string (some binary to sign) and returns bytes (the signed binary). diff --git a/canarytokens/awskeys.py b/canarytokens/awskeys.py index 54883bae0..7194c3a13 100644 --- a/canarytokens/awskeys.py +++ b/canarytokens/awskeys.py @@ -1,19 +1,12 @@ import logging import re -from typing import Literal, Optional, TypedDict +from typing import Optional import requests from pydantic import HttpUrl from canarytokens import tokens - - -class AWSKey(TypedDict): - access_key_id: str - secret_access_key: str - # TODO: make enum - region: str - output: Literal["json", "yaml", "yaml-stream", "text", "table"] +from canarytokens.models import AWSKey def validate_record(server: str, token: tokens.Canarytoken) -> bool: diff --git a/canarytokens/azurekeys.py b/canarytokens/azurekeys.py new file mode 100644 index 000000000..61bc7059c --- /dev/null +++ b/canarytokens/azurekeys.py @@ -0,0 +1,50 @@ +import logging +from typing import Optional + +import requests +from pydantic import HttpUrl + +from canarytokens.models import AzureID +from canarytokens import tokens + + +def get_azure_id( + token: tokens.Canarytoken, + server: str, + cert_file_name: str, + azure_url: Optional[HttpUrl] = None, + app_id: Optional[str] = None, + cert: Optional[str] = None, + tenant_id: Optional[str] = None, + cert_name: Optional[str] = None, +) -> AzureID: # pragma: no cover + if app_id and cert and tenant_id and cert_name: + return AzureID( + **{ + "app_id": app_id, + "cert": cert, + "tenant_id": tenant_id, + "cert_name": cert_name, + "cert_file_name": cert_file_name, + } + ) + if not (token and server) or len(server) == 0: + logging.error("Empty values passed through to get_azure_id function.") + raise ValueError("get_azure_id requires token and server to be set.") + if not azure_url: + raise ValueError("get_azure_id requires azure_url to request from.") + + data = {"token": token.value(), "domain": server} + + resp = requests.post(url=azure_url, json=data) + resp.raise_for_status() + resp_json = resp.json() + return AzureID( + **{ + "app_id": resp_json["app_id"], + "cert": resp_json["cert"], + "tenant_id": resp_json["tenant_id"], + "cert_name": resp_json["cert_name"], + "cert_file_name": cert_file_name, + } + ) diff --git a/canarytokens/canarydrop.py b/canarytokens/canarydrop.py index 117e232cf..6a0f43f65 100644 --- a/canarytokens/canarydrop.py +++ b/canarytokens/canarydrop.py @@ -112,6 +112,14 @@ class Canarydrop(BaseModel): aws_secret_access_key: Optional[str] aws_output: Optional[str] = Field(alias="output") aws_region: Optional[str] = Field(alias="region") + + # Azure key specific stuff + app_id: Optional[str] + tenant_id: Optional[str] + cert: Optional[str] + cert_name: Optional[str] + cert_file_name: Optional[str] + # HTTP style token specific stuff browser_scanner_enabled: Optional[bool] # Wireguard specific stuff diff --git a/canarytokens/channel_http.py b/canarytokens/channel_http.py index 3c0700fa5..b4a82aa6c 100644 --- a/canarytokens/channel_http.py +++ b/canarytokens/channel_http.py @@ -127,6 +127,11 @@ def render_POST(self, request: Request): canarydrop.add_canarydrop_hit(token_hit=token_hit) self.dispatch(canarydrop=canarydrop, token_hit=token_hit) return b"success" + elif canarydrop.type == TokenTypes.AZURE_ID: + token_hit = Canarytoken._parse_azure_id_trigger(request) + canarydrop.add_canarydrop_hit(token_hit=token_hit) + self.dispatch(canarydrop=canarydrop, token_hit=token_hit) + return b"success" elif canarydrop.type in [ TokenTypes.SLOW_REDIRECT, TokenTypes.WEB_IMAGE, diff --git a/canarytokens/extendtoken.py b/canarytokens/extendtoken.py index c3d7f5716..61cb0c745 100644 --- a/canarytokens/extendtoken.py +++ b/canarytokens/extendtoken.py @@ -30,7 +30,7 @@ def __init__( password, card_name, token=None, - ): + ): # pragma: no cover self.email = email self.token = token self.kind = "AMEX" @@ -45,7 +45,9 @@ def __init__( self.token = req.json().get("token") self.refresh_token = req.json().get("refresh_token") - def _post_api(self, endpoint: str, data: Optional[str] = None) -> requests.Response: + def _post_api( + self, endpoint: str, data: Optional[str] = None + ) -> requests.Response: # pragma: no cover """Performs a POST against the passed endpoint with the data passed""" headers = { "Content-Type": "application/json", @@ -163,7 +165,7 @@ def get_virtual_cards(self) -> list[tuple[str, str]]: # pragma: no cover ) return cards - def get_card_info(self, card_id) -> Optional[dict[str, str]]: + def get_card_info(self, card_id) -> Optional[dict[str, str]]: # pragma: no cover """Returns all the data about a passed card_id available""" req = self._get_api("https://v.paywithextend.com/virtualcards/" + card_id) return req.json() @@ -228,7 +230,7 @@ def make_card( address: str, billing_zip: str, limit_cents: int = 100, - ) -> CreditCard: + ) -> CreditCard: # pragma: no cover """Creates a new CreditCard via Extend's CreateVirtualCard API""" cc = self.get_parent_card_id() now_ts = datetime.datetime.now() - datetime.timedelta(days=1) @@ -283,7 +285,7 @@ def create_credit_card( last_name: Optional[str] = None, address: Optional[str] = None, billing_zip: Optional[str] = None, - ) -> CreditCard: + ) -> CreditCard: # pragma: no cover """Creates a cardholder and associated virtual card for the passed person, if not passed, will generate fake data to use""" fake_person = generate_person() if first_name is None: @@ -305,13 +307,13 @@ def create_credit_card( return cc - def get_credit_card(self, id: str) -> CreditCard: + def get_credit_card(self, id: str) -> CreditCard: # pragma: no cover """Abstract method to get a virtual credit card""" pass - def get_transaction_events( # noqa C901 # pragma: no cover + def get_transaction_events( # noqa C901 self, since: Optional[datetime.datetime] = None - ) -> list: + ) -> list: # pragma: no cover """Returns a list of recent transactions for the org""" txns = [] req = self._get_api("https://api.paywithextend.com/events") diff --git a/canarytokens/models.py b/canarytokens/models.py index 19ae5e923..0b67210d4 100644 --- a/canarytokens/models.py +++ b/canarytokens/models.py @@ -159,6 +159,22 @@ def live(self) -> bool: return strtobool(os.getenv("LIVE", "FALSE")) +class AWSKey(TypedDict): + access_key_id: str + secret_access_key: str + # TODO: make enum + region: str + output: Literal["json", "yaml", "yaml-stream", "text", "table"] + + +class AzureID(TypedDict): + app_id: str + tenant_id: str + cert: str + cert_name: str + cert_file_name: str + + class KubeCerts(TypedDict): """Kube digest (f), cert (c) and key (k) are stored directly and not base64 encoded. @@ -280,6 +296,7 @@ class TokenTypes(str, enum.Enum): SQL_SERVER = "sql_server" MY_SQL = "my_sql" AWS_KEYS = "aws_keys" + AZURE_ID = "azure_id" SIGNED_EXE = "signed_exe" FAST_REDIRECT = "fast_redirect" SLOW_REDIRECT = "slow_redirect" @@ -411,6 +428,22 @@ class AWSKeyTokenRequest(TokenRequest): token_type: Literal[TokenTypes.AWS_KEYS] = TokenTypes.AWS_KEYS +class AzureIDTokenRequest(TokenRequest): + token_type: Literal[TokenTypes.AZURE_ID] = TokenTypes.AZURE_ID + azure_id_cert_file_name: str + + def v2_dict(self) -> Dict[str, Any]: + webhook_url = self.webhook_url or "" + out_dict = { + "type": getattr(self, "token_type").value, + "email": self.email or "", + "memo": self.memo, + "webhook": str(webhook_url), + "azure_id_cert_file_name": self.azure_id_cert_file_name, + } + return out_dict + + class QRCodeTokenRequest(TokenRequest): token_type: Literal[TokenTypes.QR_CODE] = TokenTypes.QR_CODE @@ -598,6 +631,7 @@ class WindowsDirectoryTokenRequest(TokenRequest): FastRedirectTokenRequest, QRCodeTokenRequest, AWSKeyTokenRequest, + AzureIDTokenRequest, PDFTokenRequest, DNSTokenRequest, Log4ShellTokenRequest, @@ -670,6 +704,15 @@ class AWSKeyTokenResponse(TokenResponse): output: str +class AzureIDTokenResponse(TokenResponse): + token_type: Literal[TokenTypes.AZURE_ID] = TokenTypes.AZURE_ID + app_id: str + tenant_id: str + cert: str + cert_name: str + cert_file_name: str + + class PDFTokenResponse(TokenResponse): token_type: Literal[TokenTypes.ADOBE_PDF] = TokenTypes.ADOBE_PDF hostname: str # Hostname Local testing fails this check TODO: FIXME @@ -880,6 +923,7 @@ class MySQLTokenResponse(TokenResponse): CustomBinaryTokenResponse, SlowRedirectTokenResponse, AWSKeyTokenResponse, + AzureIDTokenResponse, MsWordDocumentTokenResponse, Log4ShellTokenResponse, MsExcelDocumentTokenResponse, @@ -1045,6 +1089,47 @@ def normalize_additional_info_names(cls, values: dict[str, Any]) -> dict[str, An return {k.lower(): v for k, v in values.items()} +class AzureIDAdditionalInfo(BaseModel): + azure_id_log_data: dict[str, list[str]] + microsoft_azure: dict[str, list[str]] + location: dict[str, list[str]] + coordinates: dict[str, list[str]] + + @root_validator(pre=True) + def normalize_additional_info_names(cls, values: dict[str, Any]) -> dict[str, Any]: # type: ignore + keys_to_convert = [ + # TODO: make this consistent. + ("Azure ID Log Data", "azure_id_log_data"), + ("Microsoft Azure", "microsoft_azure"), + ("Location", "location"), + ("Coordinates", "coordinates"), + ] + for old_key, new_key in keys_to_convert: # pragma: no cover + if old_key in values: + values[new_key] = values.pop(old_key) + + return {k.lower(): v for k, v in values.items()} + + def serialize_for_v2(self) -> dict: + """Serialize an `AzureIDTokenHit` into a dict + that holds the equivalent info in the v2 shape. + Returns: + dict: AzureIDTokenHit in v2 dict representation. + """ + data = self.dict() + keys_to_convert = [ + # TODO: make this consistent. + ("Azure ID Log Data", "azure_id_log_data"), + ("Microsoft Azure", "microsoft_azure"), + ("Location", "location"), + ("Coordinates", "coordinates"), + ] + for value, key in keys_to_convert: + if key in data: + data[value] = data.pop(key) + return data + + class AdditionalInfo(BaseModel): # the ServiceInfo keys are dynamic # this only works for our test @@ -1141,6 +1226,25 @@ def adjust_geo_info(cls, value): return value +class AzureIDTokenHit(TokenHit): + token_type: Literal[TokenTypes.AZURE_ID] = TokenTypes.AZURE_ID + additional_info: Optional[AzureIDAdditionalInfo] + + class Config: + allow_population_by_field_name = True + + def serialize_for_v2(self) -> dict: + """Serialize an `AzureIDTokenHit` into a dict + that holds the equivalent info in the v2 shape. + Returns: + dict: AzureIDTokenHit in v2 dict representation. + """ + data = json_safe_dict(self, exclude=("token_type", "time_of_hit")) + if "additional_info" in data: + data["additional_info"] = self.additional_info.serialize_for_v2() + return data + + class AWSKeyTokenHit(TokenHit): token_type: Literal[TokenTypes.AWS_KEYS] = TokenTypes.AWS_KEYS useragent: Optional[str] = Field( @@ -1339,6 +1443,7 @@ class WireguardTokenHit(TokenHit): CMDTokenHit, DNSTokenHit, AWSKeyTokenHit, + AzureIDTokenHit, PDFTokenHit, ClonedWebTokenHit, Log4ShellTokenHit, @@ -1443,6 +1548,11 @@ class AWSKeyTokenHistory(TokenHistory[AWSKeyTokenHit]): hits: List[AWSKeyTokenHit] +class AzureIDTokenHistory(TokenHistory): + token_type: Literal[TokenTypes.AZURE_ID] = TokenTypes.AZURE_ID + hits: List[AzureIDTokenHit] + + class DNSTokenHistory(TokenHistory[DNSTokenHit]): token_type: Literal[TokenTypes.DNS] = TokenTypes.DNS hits: List[DNSTokenHit] @@ -1600,6 +1710,7 @@ class SvnTokenHistory(TokenHistory[SvnTokenHit]): CMDTokenHistory, DNSTokenHistory, AWSKeyTokenHistory, + AzureIDTokenHistory, PDFTokenHistory, SMTPTokenHistory, ClonedWebTokenHistory, @@ -1813,6 +1924,8 @@ class DownloadFmtTypes(str, enum.Enum): MSEXCEL = "msexcel" PDF = "pdf" AWSKEYS = "awskeys" + AZUREIDCONFIG = "azure_id_config" + AZUREIDCERT = "azure_id" KUBECONFIG = "kubeconfig" SLACKAPI = "slackapi" INCIDENTLISTJSON = "incidentlist_json" @@ -1887,6 +2000,14 @@ class DownloadAWSKeysRequest(TokenDownloadRequest): fmt: Literal[DownloadFmtTypes.AWSKEYS] = DownloadFmtTypes.AWSKEYS +class DownloadAzureIDConfigRequest(TokenDownloadRequest): + fmt: Literal[DownloadFmtTypes.AZUREIDCONFIG] = DownloadFmtTypes.AZUREIDCONFIG + + +class DownloadAzureIDCertRequest(TokenDownloadRequest): + fmt: Literal[DownloadFmtTypes.AZUREIDCERT] = DownloadFmtTypes.AZUREIDCERT + + class DownloadCMDRequest(TokenDownloadRequest): fmt: Literal[DownloadFmtTypes.CMD] = DownloadFmtTypes.CMD @@ -1906,6 +2027,8 @@ class DownloadSplackApiRequest(TokenDownloadRequest): AnyDownloadRequest = Annotated[ Union[ DownloadAWSKeysRequest, + DownloadAzureIDConfigRequest, + DownloadAzureIDCertRequest, DownloadCCRequest, DownloadCMDRequest, DownloadIncidentListCSVRequest, @@ -2035,6 +2158,26 @@ class DownloadAWSKeysResponse(TokenDownloadResponse): output: str +class DownloadAzureIDConfigResponse(TokenDownloadResponse): + contenttype: Literal[ + DownloadContentTypes.TEXTPLAIN + ] = DownloadContentTypes.TEXTPLAIN + filename: str + token: str + auth: str + ... + + +class DownloadAzureIDCertResponse(TokenDownloadResponse): + contenttype: Literal[ + DownloadContentTypes.TEXTPLAIN + ] = DownloadContentTypes.TEXTPLAIN + filename: str + token: str + auth: str + ... + + class DownloadKubeconfigResponse(TokenDownloadResponse): contenttype: Literal[ DownloadContentTypes.TEXTPLAIN diff --git a/canarytokens/settings.py b/canarytokens/settings.py index e77f6b4ce..5aa23c1c3 100644 --- a/canarytokens/settings.py +++ b/canarytokens/settings.py @@ -25,6 +25,9 @@ class Settings(BaseSettings): TESTING_AWS_REGION: Optional[str] = "us-east-2" TESTING_AWS_OUTPUT: Optional[str] = "json" + AZURE_ID_TOKEN_URL: HttpUrl + AZURE_ID_TOKEN_AUTH: str + WG_PRIVATE_KEY_SEED: str WG_PRIVATE_KEY_N: str = "1000" diff --git a/canarytokens/sign_file.py b/canarytokens/sign_file.py index 2f310e82e..e685782a4 100644 --- a/canarytokens/sign_file.py +++ b/canarytokens/sign_file.py @@ -7,7 +7,7 @@ def authenticode_sign_binary( nxdomain_token_url: str, inputfile: Path, outputfile: Path -): +): # pragma: no cover try: tmpdir = tempfile.mkdtemp() diff --git a/canarytokens/tokens.py b/canarytokens/tokens.py index 2f267e797..66214c7a5 100644 --- a/canarytokens/tokens.py +++ b/canarytokens/tokens.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import json import random import re from datetime import datetime @@ -19,7 +20,7 @@ INPUT_CHANNEL_HTTP, ) from canarytokens.exceptions import NoCanarytokenFound -from canarytokens.models import AnyTokenHit, AWSKeyTokenHit, TokenTypes +from canarytokens.models import AnyTokenHit, AWSKeyTokenHit, AzureIDTokenHit, TokenTypes # TODO: put these in a nicer place. Ensure re.compile is called only once at startup # add a naming convention for easy reading when seen in other files. @@ -402,6 +403,70 @@ def _parse_aws_key_trigger( } return AWSKeyTokenHit(**hit_info) + @staticmethod + def _parse_azure_id_trigger( + request: Request, + ) -> AzureIDTokenHit: + """When an AzureID token is triggered, Azure makes a POST request + back to switchboard. The `request` is processed, fields extracted, + and an `AzureIDTokenHit` is created. + + Args: + request (twisted.web.http.Request): containing Azure Key hit information. + + Returns: + AzureIDTokenHit: Structured Azure Key Specific hit info. + """ + + hit_time = datetime.utcnow().strftime("%s.%f") + + json_data = json.loads(request.content.read()) + src_ip = json_data.get("ip", "127.0.0.1") + + auth_details = json_data.get("auth_details", "") + if type(auth_details) == list: + out = "" + for d in auth_details: + out += "\n{}: {}".format(d["key"], d["value"]) + auth_details = out + + location_details = json_data.get("location", {}) + geo_details = location_details.get("geoCoordinates", {}) + + additional_info = { + "Azure ID Log Data": { + "Date": [json_data.get("time", "Not Available")], + "Authentication": [auth_details], + }, + "Microsoft Azure": { + "Resource": [json_data.get("resource", "Not Available")], + "App ID": [json_data.get("app_id", "Not Available")], + "Cert ID": [json_data.get("cert_id", "Not Available")], + }, + "Location": { + "city": [location_details.get("city", "Not Available")], + "state": [location_details.get("state", "Not Available")], + "countryOrRegion": [ + location_details.get("countryOrRegion", "Not Available") + ], + }, + "Coordinates": { + "latitude": [geo_details.get("latitude", "Not Available")], + "longitude": [geo_details.get("longitude", "Not Available")], + }, + } + hit_info = { + "token_type": TokenTypes.AZURE_ID, + "time_of_hit": hit_time, + "input_channel": INPUT_CHANNEL_HTTP, + "src_ip": src_ip, + # "geo_info": geo_info, + # "is_tor_relay": is_tor_relay, + # "user_agent": user_agent, + "additional_info": additional_info, + } + return AzureIDTokenHit(**hit_info) + @staticmethod def _get_info_for_clonedsite(request): http_general_info = Canarytoken._grab_http_general_info(request=request) diff --git a/frontend/app.py b/frontend/app.py index 8d45fc113..f6e4a369c 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -37,6 +37,7 @@ from canarytokens import wireguard as wg from canarytokens.authenticode import make_canary_authenticode_binary from canarytokens.awskeys import get_aws_key +from canarytokens.azurekeys import get_azure_id from canarytokens.canarydrop import Canarydrop from canarytokens.exceptions import CanarydropAuthFailure from canarytokens.models import ( @@ -46,6 +47,8 @@ AnyTokenResponse, AWSKeyTokenRequest, AWSKeyTokenResponse, + AzureIDTokenRequest, + AzureIDTokenResponse, CCTokenRequest, CCTokenResponse, ClonedWebTokenRequest, @@ -60,6 +63,10 @@ DNSTokenResponse, DownloadAWSKeysRequest, DownloadAWSKeysResponse, + DownloadAzureIDCertRequest, + DownloadAzureIDCertResponse, + DownloadAzureIDConfigRequest, + DownloadAzureIDConfigResponse, DownloadCCRequest, DownloadCCResponse, DownloadCMDRequest, @@ -158,7 +165,7 @@ RedisIntegration(), FastApiIntegration(), ], - release=canarytokens.utils.get_deployed_commit_sha(), + release=get_deployed_commit_sha(), ) @@ -303,7 +310,7 @@ def generate_page(request: Request) -> HTMLResponse: ) async def generate(request: Request) -> AnyTokenResponse: # noqa: C901 # gen is large """ - Whatt + Generate a token and return the appropriate TokenResponse """ if request.headers.get("Content-Type", "application/json") == "application/json": @@ -687,6 +694,42 @@ def _( ) +@create_download_response.register +def _( + download_request_details: DownloadAzureIDConfigRequest, canarydrop: Canarydrop +) -> Response: + return DownloadAzureIDConfigResponse( + token=download_request_details.token, + auth=download_request_details.auth, + content=textwrap.dedent( + f""" + {{ + "appId": "{canarydrop.app_id}", + "displayName": "azure-cli-{canarydrop.cert_name}", + "fileWithCertAndPrivateKey": "{canarydrop.cert_file_name}", + "password": null, + "tenant": "{canarydrop.tenant_id}" + }} + """ + ).strip(), + filename=canarydrop.cert_file_name.replace(".pem", ".json") + if canarydrop.cert_file_name.endswith(".pem") + else canarydrop.cert_file_name, + ) + + +@create_download_response.register +def _( + download_request_details: DownloadAzureIDCertRequest, canarydrop: Canarydrop +) -> Response: + return DownloadAzureIDCertResponse( + token=download_request_details.token, + auth=download_request_details.auth, + content=canarydrop.cert, + filename=canarydrop.cert_file_name, + ) + + @create_download_response.register def _( download_request_details: DownloadKubeconfigRequest, canarydrop: Canarydrop @@ -949,6 +992,62 @@ def _create_aws_key_token_response( ) +@create_response.register +def _create_azure_id_token_response( + token_request_details: AzureIDTokenRequest, + canarydrop: Canarydrop, + settings: Optional[Settings] = None, +) -> AzureIDTokenResponse: + if settings is None: + settings = switchboard_settings + + try: + if not settings.AZURE_ID_TOKEN_URL: + raise ValueError("No URL provided for AZURE ID creation") + + if not settings.AZURE_ID_TOKEN_AUTH: + raise ValueError("No AUTH token provided for AZURE ID creation") + + key = get_azure_id( + token=canarydrop.canarytoken, + server=get_all_canary_domains()[0], + cert_file_name=token_request_details.azure_id_cert_file_name, + azure_url=HttpUrl( + f"{settings.AZURE_ID_TOKEN_URL}?code={settings.AZURE_ID_TOKEN_AUTH}", + scheme=settings.AZURE_ID_TOKEN_URL.scheme, + ), + ) + except Exception as e: + capture_exception(error=e, context=("get_azure_id", None)) + # We can fail by getting 404 from AZURE_ID_URL or failing validation + return response_error( + 4, message="Failed to generate Azure IDs. We looking into it." + ) + + canarydrop.cert_file_name = key["cert_file_name"] + canarydrop.app_id = key["app_id"] + canarydrop.cert = key["cert"] + canarydrop.tenant_id = key["tenant_id"] + canarydrop.cert_name = key["cert_name"] + canarydrop.token_url = f"{canary_http_channel}/{canarydrop.canarytoken.value()}" + save_canarydrop(canarydrop) + return AzureIDTokenResponse( + email=canarydrop.alert_email_recipient or "", + webhook_url=canarydrop.alert_webhook_url or "", + token=canarydrop.canarytoken.value(), + token_url=canarydrop.token_url, + auth_token=canarydrop.auth, + hostname=canarydrop.generated_hostname, + url_components=list(canarydrop.get_url_components()), + # additional information for Azure token response + cert_file_name=canarydrop.cert_file_name, + app_id=canarydrop.app_id, + cert=canarydrop.cert, + tenant_id=canarydrop.tenant_id, + cert_name=canarydrop.cert_name, + ) + + @create_response.register def _( token_request_details: CMDTokenRequest, canarydrop: Canarydrop diff --git a/makefile b/makefile index 0b00ff274..17ab4ea9c 100644 --- a/makefile +++ b/makefile @@ -10,29 +10,29 @@ frontend: .PHONY: testv3 testv3: - cd tests + cd tests; \ TEST_HOST=`docker network inspect canarytokens_devcontainer_default | jq '.[0].Containers | to_entries[].value | select(.Name == "canarytokens_devcontainer-app-1").IPv4Address' | sed -E 's/"//g; s/\/[0-9]+//'` \ - poetry run coverage run --source=../canarytokens -m pytest . --runv3 -v + poetry run coverage run --source=../canarytokens -m pytest . --runv3 -v; \ poetry run coverage report -m .PHONY: testv3live testv3live: - cd tests - CANARY_CHANNEL_MYSQL_PORT=3306 TEST_HOST=jingwei.tools LIVE=True poetry run coverage run --source=integration -m pytest integration --runv3 - poetry run coverage report -m + cd tests; \ + CANARY_CHANNEL_MYSQL_PORT=3306 TEST_HOST=jingwei.tools LIVE=True poetry run coverage run --source=integration -m pytest integration --runv3; \ + poetry run coverage report -m; \ .PHONY: unitsv3 unitsv3: - cd tests - poetry run coverage run --source=../canarytokens -m pytest units --runv3 + cd tests; \ + poetry run coverage run --source=../canarytokens -m pytest units --runv3 -v; \ poetry run coverage report -m .PHONY: testv2 testv2: - cd tests + cd tests; \ poetry run pytest integration --runv2 --pdb .PHONY: testv2-s testv2-s: - cd tests + cd tests; \ poetry run pytest -s integration --runv2 --pdb diff --git a/switchboard/switchboard.env.dist b/switchboard/switchboard.env.dist index 162f43989..45023caf3 100644 --- a/switchboard/switchboard.env.dist +++ b/switchboard/switchboard.env.dist @@ -27,6 +27,8 @@ CANARY_ALERT_EMAIL_SUBJECT="Canarytoken" CANARY_SENDGRID_API_KEY="" CANARY_SENDGRID_SANDBOX_MODE=True CANARY_AWSID_URL="https://not-in-use.com/test" +CANARY_AZURE_ID_TOKEN_URL="https://not-in-use.com/test" +CANARY_AZURE_ID_TOKEN_AUTH="" CANARY_WG_PRIVATE_KEY_SEED=vk/GD+frlhve/hDTTSUvqpQ/WsQtioKAri0Rt5mg7dw= CANARY_LISTEN_DOMAIN=127.0.0.1 diff --git a/templates/generate_new.html b/templates/generate_new.html index 714659fce..e8692d12d 100644 --- a/templates/generate_new.html +++ b/templates/generate_new.html @@ -119,6 +119,7 @@
Save this json config file along with the certificate:
+This canarytoken is triggered when someone uses this Service Principal Login to access Azure programmatically (through the API).
+The Service Principal Login is unique. i.e. There is no chance of somebody guessing these credentials.
+If this token fires, it is a clear indication that this Service Principal Login has "leaked".
+Ideas for use: +