Skip to content

Commit

Permalink
T7860 credit card v2 implementation (#586)
Browse files Browse the repository at this point in the history
* Add Credit Card token backend

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
mclmax authored Oct 10, 2024
1 parent 2ad4869 commit 3da3744
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 7 deletions.
7 changes: 7 additions & 0 deletions canarytokens/canarydrop.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ class Canarydrop(BaseModel):
pwa_icon: Optional[PWAType]
pwa_app_name: Optional[str]

cc_v2_card_id: Optional[str]
cc_v2_card_number: Optional[str]
cc_v2_cvv: Optional[str]
cc_v2_expiry_month: Optional[int]
cc_v2_expiry_year: Optional[int]
cc_v2_name_on_card: Optional[str]

@root_validator(pre=True)
def _validate_triggered_details(cls, values):
"""
Expand Down
5 changes: 5 additions & 0 deletions canarytokens/channel_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ def render_POST(self, request: Request):
canarydrop.add_canarydrop_hit(token_hit=token_hit)
self.dispatch(canarydrop=canarydrop, token_hit=token_hit)
return b"success"
elif canarydrop.type == TokenTypes.CREDIT_CARD_V2:
token_hit = Canarytoken._parse_credit_card_v2_trigger(request)
canarydrop.add_canarydrop_hit(token_hit=token_hit)
self.dispatch(canarydrop=canarydrop, token_hit=token_hit)
return b"success"
elif canarydrop.type in [
TokenTypes.SLOW_REDIRECT,
TokenTypes.WEB_IMAGE,
Expand Down
173 changes: 173 additions & 0 deletions canarytokens/credit_card_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import boto3
import botocore
import socket
import json

from canarytokens.settings import FrontendSettings
from canarytokens.models import Canarytoken
from dataclasses import dataclass
from typing import Optional, Tuple
from enum import Enum
from pydantic import BaseModel


frontend_settings = FrontendSettings()


_CACHED_LAMBDA_CLIENT = None
_RETRY_COUNT = 2


class _Api(Enum):
CARD_CREATE = "/card/create"
CUSTOMER_DETAILS = "/customer/details"


class Status(Enum):
SUCCESS = "success"
CUSTOMER_NOT_FOUND = "customer_not_found"
NO_AVAILABLE_CARDS = "no_cards"
NO_MORE_CREDITS = "no_more_credits"
ERROR = "error"
FORBIDDEN = "forbidden"


@dataclass(frozen=True)
class CreditCard:
card_id: str
card_number: str
cvv: str
expiry_month: int
expiry_year: int
name_on_card: str


@dataclass(frozen=True)
class Customer:
guid: str
created: str
canarytoken_domain: str
cards_quota: int
cards_assigned: int


class CreditCardTrigger(BaseModel):
canarytoken: Canarytoken
auth_code: Optional[str]
billing_amount: Optional[str]
billing_currency: Optional[str]
card_id: Optional[str]
card_nickname: Optional[str]
client_data: Optional[str]
failure_reason: Optional[str]
masked_card_number: Optional[str]
merchant: Optional[dict]
network_transaction_id: Optional[str]
posted_date: Optional[str]
retrieval_ref: Optional[str]
status: Optional[str]
transaction_amount: Optional[str]
transaction_currency: Optional[str]
transaction_date: Optional[str]
transaction_id: Optional[str]
transaction_type: Optional[str]
risk_details: Optional[dict]


def _get_lambda_client(refresh_client: bool = False):
"""Creates a botocore session and grabs sts client.
This allows for getting assumed role creds without polluting
or misusing the default boto3 session.
Using these sts creds a client is returned for a `service_name` service.
Returns:
boto3.Client: boto3 client of the Lambda service.
"""
global _CACHED_LAMBDA_CLIENT

if _CACHED_LAMBDA_CLIENT is not None and not refresh_client:
return _CACHED_LAMBDA_CLIENT

botocore_session = botocore.session.get_session()
botocore_session.set_config_variable("CREDENTIALS_FILE".lower(), "")
botocore_session.set_config_variable("SHARED_CREDENTIALS_FILE".lower(), "")
botocore_session.set_config_variable("CONFIG_FILE".lower(), "")
session = boto3.Session(botocore_session=botocore_session)
client = session.client("sts").assume_role(
RoleArn=f"arn:aws:iam::{frontend_settings.CREDIT_CARD_INFRA_ACCOUNT_ID}:role/{frontend_settings.CREDIT_CARD_INFRA_ACCESS_ROLE}",
RoleSessionName=socket.gethostname(),
)
client_session = boto3.session.Session()
_CACHED_LAMBDA_CLIENT = client_session.client(
"lambda",
region_name=frontend_settings.CREDIT_CARD_INFRA_REGION,
aws_access_key_id=client["Credentials"]["AccessKeyId"],
aws_secret_access_key=client["Credentials"]["SecretAccessKey"],
aws_session_token=client["Credentials"]["SessionToken"],
)

return _CACHED_LAMBDA_CLIENT


def _invoke_lambda(payload: dict) -> dict:
client = _get_lambda_client()

for attempt in range(_RETRY_COUNT):
try:
return client.invoke(
FunctionName=frontend_settings.CREDIT_CARD_INFRA_LAMBDA,
InvocationType="RequestResponse",
Payload=json.dumps(payload),
)
except botocore.exceptions.ClientError as err:
if (
err.response["Error"]["Code"] == "ExpiredTokenException"
and attempt < _RETRY_COUNT - 1
):
client = _get_lambda_client(refresh_client=True)
continue
raise err


def create_card(canarytoken: str) -> Tuple[Status, Optional[CreditCard]]:
if not frontend_settings.CREDIT_CARD_TOKEN_ENABLED:
return (Status.ERROR, None)

payload = {
"api": _Api.CARD_CREATE.value,
"guid": frontend_settings.CREDIT_CARD_INFRA_CUSTOMER_GUID,
"secret": frontend_settings.CREDIT_CARD_INFRA_CUSTOMER_SECRET,
"canarytoken": canarytoken,
}

response = _invoke_lambda(payload)
response_payload = json.loads(response["Payload"].read())

status = Status(response_payload.get("status"))

if status == Status.SUCCESS:
return (Status.SUCCESS, CreditCard(**response_payload["body"]["card"]))

return (status, None)


def get_customer_details() -> Tuple[Status, Optional[Customer]]:
if not frontend_settings.CREDIT_CARD_TOKEN_ENABLED:
return (Status.ERROR, None)

payload = {
"api": _Api.CUSTOMER_DETAILS.value,
"guid": frontend_settings.CREDIT_CARD_INFRA_CUSTOMER_GUID,
"secret": frontend_settings.CREDIT_CARD_INFRA_CUSTOMER_SECRET,
}

response = _invoke_lambda(payload)
response_payload = json.loads(response["Payload"].read())

status = Status(response_payload.get("status"))

if status == Status.SUCCESS:
return (Status.SUCCESS, Customer(**response_payload["body"]["customer"]))

return (status, None)
53 changes: 47 additions & 6 deletions canarytokens/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,6 @@ def get_latest_transaction(self, cc: CreditCard) -> Optional[Dict[str, str]]:
pass


# class CCToken(object):
# def __init__(self, api_provider: ApiProvider):
# pass


class TokenTypes(str, enum.Enum):
"""Enumerates all supported token types"""

Expand All @@ -290,6 +285,7 @@ class TokenTypes(str, enum.Enum):
WINDOWS_DIR = "windows_dir"
CLONEDSITE = "clonedsite"
CSSCLONEDSITE = "cssclonedsite"
CREDIT_CARD_V2 = "credit_card_v2"
QR_CODE = "qr_code"
SVN = "svn"
SMTP = "smtp"
Expand Down Expand Up @@ -347,6 +343,7 @@ def __str__(self) -> str:
TokenTypes.LOG4SHELL: "Log4Shell",
TokenTypes.CMD: "Sensitive command",
TokenTypes.CC: "Credit card",
TokenTypes.CREDIT_CARD_V2: "Credit card",
TokenTypes.PWA: "Fake app",
TokenTypes.SLACK_API: "Slack API",
TokenTypes.LEGACY: "Legacy",
Expand Down Expand Up @@ -780,6 +777,10 @@ class WindowsDirectoryTokenRequest(TokenRequest):
token_type: Literal[TokenTypes.WINDOWS_DIR] = TokenTypes.WINDOWS_DIR


class CreditCardV2TokenRequest(TokenRequest):
token_type: Literal[TokenTypes.CREDIT_CARD_V2] = TokenTypes.CREDIT_CARD_V2


AnyTokenRequest = Annotated[
Union[
CCTokenRequest,
Expand Down Expand Up @@ -807,6 +808,7 @@ class WindowsDirectoryTokenRequest(TokenRequest):
MsExcelDocumentTokenRequest,
SQLServerTokenRequest,
KubeconfigTokenRequest,
CreditCardV2TokenRequest,
],
Field(discriminator="token_type"),
]
Expand Down Expand Up @@ -1089,6 +1091,14 @@ class MySQLTokenResponse(TokenResponse):
usage: Optional[str]


class CreditCardV2TokenResponse(TokenResponse):
token_type: Literal[TokenTypes.CREDIT_CARD_V2] = TokenTypes.CREDIT_CARD_V2
card_number: str
cvv: str
expiry_month: int
expiry_year: int


AnyTokenResponse = Annotated[
Union[
CCTokenResponse,
Expand Down Expand Up @@ -1126,6 +1136,7 @@ class MySQLTokenResponse(TokenResponse):
MsWordDocumentTokenResponse,
MsExcelDocumentTokenResponse,
KubeconfigTokenResponse,
CreditCardV2TokenResponse,
],
Field(discriminator="token_type"),
]
Expand Down Expand Up @@ -1711,6 +1722,25 @@ class WireguardTokenHit(TokenHit):
src_data: WireguardSrcData


class CreditCardV2AdditionalInfo(BaseModel):
merchant: Optional[dict]
transaction_amount: Optional[str]
transaction_currency: Optional[str]


class CreditCardV2TokenHit(TokenHit):
token_type: Literal[TokenTypes.CREDIT_CARD_V2] = TokenTypes.CREDIT_CARD_V2
additional_info: Optional[CreditCardV2AdditionalInfo]

def serialize_for_v2(self) -> dict:
"""Serialize an `CreditCardV2TokenHit` into a dict
that holds the equivalent info in the v2 shape.
Returns:
dict: CreditCardV2TokenHit in v2 dict representation.
"""
return json_safe_dict(self, exclude=("token_type", "time_of_hit"))


class LegacyTokenHit(TokenHit):
# excel; word; image; QR;
token_type: Literal[TokenTypes.LEGACY] = TokenTypes.LEGACY
Expand Down Expand Up @@ -1758,6 +1788,7 @@ class LegacyTokenHit(TokenHit):
SQLServerTokenHit,
KubeconfigTokenHit,
LegacyTokenHit,
CreditCardV2TokenHit,
],
Field(discriminator="token_type"),
]
Expand Down Expand Up @@ -1794,7 +1825,11 @@ def serialize_for_v2(self, readable_time_format: bool = False) -> dict[str, str]
"""
data = {}
for hit in self.hits:
if isinstance(hit, AWSKeyTokenHit) or isinstance(hit, SlackAPITokenHit):
if (
isinstance(hit, AWSKeyTokenHit)
or isinstance(hit, SlackAPITokenHit)
or isinstance(hit, CreditCardV2TokenHit)
):
hit_data = hit.serialize_for_v2()
else:
hit_data = json_safe_dict(hit, exclude=("token_type", "time_of_hit"))
Expand Down Expand Up @@ -1956,6 +1991,11 @@ class SvnTokenHistory(TokenHistory[SvnTokenHit]):
hits: List[SvnTokenHit] = []


class CreditCardV2TokenHistory(TokenHistory[CreditCardV2TokenHit]):
token_type: Literal[TokenTypes.CREDIT_CARD_V2] = TokenTypes.CREDIT_CARD_V2
hits: List[CreditCardV2TokenHit] = []


class LegacyTokenHistory(TokenHistory[LegacyTokenHit]):
token_type: Literal[TokenTypes.LEGACY] = TokenTypes.LEGACY
hits: List[LegacyTokenHit] = []
Expand Down Expand Up @@ -1995,6 +2035,7 @@ class LegacyTokenHistory(TokenHistory[LegacyTokenHit]):
SQLServerTokenHistory,
KubeconfigTokenHistory,
LegacyTokenHistory,
CreditCardV2TokenHistory,
],
Field(discriminator="token_type"),
]
Expand Down
7 changes: 7 additions & 0 deletions canarytokens/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ class FrontendSettings(BaseSettings):
CLOUDFRONT_URL: Optional[HttpUrl]
AZUREAPP_ID: Optional[str]
AZUREAPP_SECRET: Optional[str] # TODO: Figure out SecretStr with Azure secrets
CREDIT_CARD_TOKEN_ENABLED: bool = False
CREDIT_CARD_INFRA_CUSTOMER_GUID: Optional[str]
CREDIT_CARD_INFRA_CUSTOMER_SECRET: Optional[str]
CREDIT_CARD_INFRA_LAMBDA: Optional[str]
CREDIT_CARD_INFRA_ACCOUNT_ID: Optional[str]
CREDIT_CARD_INFRA_REGION: Optional[str]
CREDIT_CARD_INFRA_ACCESS_ROLE: Optional[str]

class Config:
allow_mutation = False
Expand Down
18 changes: 18 additions & 0 deletions canarytokens/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
AzureIDTokenHit,
SlackAPITokenHit,
TokenTypes,
CreditCardV2TokenHit,
CreditCardV2AdditionalInfo,
)
from canarytokens.credit_card_v2 import CreditCardTrigger

# TODO: put these in a nicer place. Ensure re.compile is called only once at startup
# add a naming convention for easy reading when seen in other files.
Expand Down Expand Up @@ -454,6 +457,21 @@ def _parse_slack_api_trigger(request):
}
return SlackAPITokenHit(**hit_info)

@staticmethod
def _parse_credit_card_v2_trigger(
request: Request,
) -> CreditCardV2TokenHit:
request_data = json.loads(request.content.read().decode())
trigger_data = CreditCardTrigger(**request_data)

hit_time = datetime.utcnow().strftime("%s.%f")
hit_info = {
"time_of_hit": hit_time,
"input_channel": INPUT_CHANNEL_HTTP,
"additional_info": CreditCardV2AdditionalInfo(**trigger_data.dict()),
}
return CreditCardV2TokenHit(**hit_info)

@staticmethod
def _get_info_for_clonedsite(request):
http_general_info = Canarytoken._grab_http_general_info(request=request)
Expand Down
Loading

0 comments on commit 3da3744

Please sign in to comment.