Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add missing scan target kinds #121

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ repos:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/psf/black
rev: 24.4.0
rev: 24.4.2
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
Expand Down
294 changes: 114 additions & 180 deletions zanshinsdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,78 +18,45 @@
import httpx
from pydantic import BaseModel, Field

from zanshinsdk.common.enums import (
AlertSeverity,
AlertsOrderOpts,
AlertState,
Day,
Frequency,
Languages,
OAuthTargetKind,
Roles,
ScanTargetGroupKind,
ScanTargetKind,
SortOpts,
TimeOfDay,
)
from zanshinsdk.common.targets import (
ScanTargetAWS,
ScanTargetAZURE,
ScanTargetBITBUCKET,
ScanTargetDOMAIN,
ScanTargetGCP,
ScanTargetGITHUB,
ScanTargetGITLAB,
ScanTargetGroupCredentialListORACLE,
ScanTargetGWORKSPACE,
ScanTargetHUAWEI,
ScanTargetJIRA,
ScanTargetMS365,
ScanTargetORACLE,
ScanTargetSALESFORCE,
ScanTargetSLACK,
ScanTargetZENDESK,
)
from zanshinsdk.common.validators import validate_class, validate_int, validate_uuid
from zanshinsdk.version import __version__ as sdk_version

CONFIG_DIR = Path.home() / ".tenchi"
CONFIG_FILE = CONFIG_DIR / "config"


class AlertState(str, Enum):
OPEN = "OPEN"
ACTIVE = "ACTIVE"
IN_PROGRESS = "IN_PROGRESS"
RISK_ACCEPTED = "RISK_ACCEPTED"
MITIGATING_CONTROL = "MITIGATING_CONTROL"
FALSE_POSITIVE = "FALSE_POSITIVE"
CLOSED = "CLOSED"


class AlertSeverity(str, Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
INFO = "INFO"


class ScanTargetKind(str, Enum):
AWS = "AWS"
GCP = "GCP"
AZURE = "AZURE"
HUAWEI = "HUAWEI"
DOMAIN = "DOMAIN"
ORACLE = "ORACLE"


class AlertsOrderOpts(str, Enum):
SCAN_TARGET_ID = "scanTargetId"
RESOURCE = "resource"
RULE = "rule"
SEVERITY = "severity"
STATE = "state"
CREATED_AT = "createdAt"
UPDATED_AT = "updatedAt"


class SortOpts(str, Enum):
ASC = "asc"
DESC = "desc"


class Frequency(Enum):
SIX_HOURS = "6h"
TWELVE_HOURS = "12h"
DAILY = "1d"
WEEKLY = "7d"


class TimeOfDay(Enum):
MORNING = "MORNING"
AFTERNOON = "AFTERNOON"
EVENING = "EVENING"
NIGHT = "NIGHT"


class Day(Enum):
SUNDAY = "SUNDAY"
MONDAY = "MONDAY"
TUESDAY = "TUESDAY"
WEDNESDAY = "WEDNESDAY"
THURSDAY = "THURSDAY"
FRIDAY = "FRIDAY"
SATURDAY = "SATURDAY"


class ScanTargetSchedule(BaseModel):
frequency: Frequency
time_of_day: Optional[TimeOfDay] = Field(TimeOfDay.NIGHT, alias="timeOfDay")
Expand Down Expand Up @@ -119,69 +86,6 @@ def json(self):
)


class ScanTargetAWS(dict):
def __init__(self, account):
dict.__init__(self, account=account)


class ScanTargetAZURE(dict):
def __init__(self, application_id, subscription_id, directory_id, secret):
dict.__init__(
self,
applicationId=application_id,
subscriptionId=subscription_id,
directoryId=directory_id,
secret=secret,
)


class ScanTargetGCP(dict):
def __init__(self, project_id):
dict.__init__(self, projectId=project_id)


class ScanTargetHUAWEI(dict):
def __init__(self, account_id):
dict.__init__(self, accountId=account_id)


class ScanTargetDOMAIN(dict):
def __init__(self, domain):
dict.__init__(self, domain=domain)


class ScanTargetORACLE(dict):
def __init__(self, compartment_id, region, tenancy_id, user_id, key_fingerprint):
dict.__init__(
self,
compartment_id=compartment_id,
region=region,
tenancy_id=tenancy_id,
user_id=user_id,
key_fingerprint=key_fingerprint,
)


class ScanTargetGroupCredentialListORACLE(dict):
def __init__(self, region, tenancy_id, user_id, key_fingerprint):
dict.__init__(
self,
region=region,
tenancy_id=tenancy_id,
user_id=user_id,
key_fingerprint=key_fingerprint,
)


class Roles(str, Enum):
ADMIN = "ADMIN"


class Languages(str, Enum):
PT_BR = "pt-BR"
EN_US = "en-US"


class Client:
def __init__(
self,
Expand Down Expand Up @@ -1016,6 +920,14 @@ def create_organization_scan_target(
ScanTargetHUAWEI,
ScanTargetDOMAIN,
ScanTargetORACLE,
ScanTargetZENDESK,
ScanTargetGWORKSPACE,
ScanTargetSLACK,
ScanTargetBITBUCKET,
ScanTargetJIRA,
ScanTargetGITLAB,
ScanTargetSALESFORCE,
ScanTargetMS365,
],
schedule: ScanTargetSchedule = DAILY,
) -> Dict:
Expand All @@ -1030,31 +942,47 @@ def create_organization_scan_target(
* For Azure scan targets, provide *applicationId*, *subscriptionId*, *directoryId* and *secret* fields.
* For GCP scan targets, provide a *projectId* field
* For DOMAIN scan targets, provide a URL in the *domain* field
* For ZENDESK scan target, provide *instance_url* field
* For Jira scan target, provide *jira_url* field
* For MS365 scan target, provide *tenant_id*, *application_id*, *secret* fields
* For GITHUB scan target, provide *installation_id*, *organizationName* fields
* For GWORKSPACE, SLACK, BITBUCKET, GITLAB, SALESFORCE no one credential are needed
:param schedule: schedule as a string or enum version of the scan frequency
:return: a dict representing the newly created scan target
"""
validate_class(kind, ScanTargetKind)
validate_class(name, str)

if kind == ScanTargetKind.AWS:
validate_class(credential, ScanTargetAWS)
elif kind == ScanTargetKind.AZURE:
validate_class(credential, ScanTargetAZURE)
elif kind == ScanTargetKind.GCP:
validate_class(credential, ScanTargetGCP)
elif kind == ScanTargetKind.HUAWEI:
validate_class(credential, ScanTargetHUAWEI)
elif kind == ScanTargetKind.DOMAIN:
validate_class(credential, ScanTargetDOMAIN)
elif kind == ScanTargetKind.ORACLE:
validate_class(credential, ScanTargetORACLE)
validator_credential_map = {
ScanTargetKind.AWS: ScanTargetAWS,
ScanTargetKind.AZURE: ScanTargetAZURE,
ScanTargetKind.GCP: ScanTargetGCP,
ScanTargetKind.HUAWEI: ScanTargetHUAWEI,
ScanTargetKind.DOMAIN: ScanTargetDOMAIN,
ScanTargetKind.ORACLE: ScanTargetORACLE,
ScanTargetKind.ZENDESK: ScanTargetZENDESK,
ScanTargetKind.GWORKSPACE: ScanTargetGWORKSPACE,
ScanTargetKind.SLACK: ScanTargetSLACK,
ScanTargetKind.BITBUCKET: ScanTargetBITBUCKET,
ScanTargetKind.JIRA: ScanTargetJIRA,
ScanTargetKind.GITLAB: ScanTargetGITLAB,
ScanTargetKind.SALESFORCE: ScanTargetSALESFORCE,
ScanTargetKind.MS365: ScanTargetMS365,
ScanTargetKind.GITHUB: ScanTargetGITHUB,
}

if not validator_credential_map.get(kind):
raise ValueError(f"Invalid kind: {kind}")

validate_class(credential, validator_credential_map.get(kind))

body = {
"name": name,
"kind": kind,
"credential": credential,
"schedule": schedule.value(),
}

return self._request(
"POST",
f"/organizations/{validate_uuid(organization_id)}/scantargets",
Expand Down Expand Up @@ -1178,6 +1106,46 @@ def check_organization_scan_target(
f"{validate_uuid(scan_target_id)}/check",
).json()

###################################################
# Scan Target OAuth
###################################################

def get_kind_oauth_link(
self,
organization_id: Union[UUID, str],
scan_target_id: Union[UUID, str],
kind: Union[ScanTargetKind, OAuthTargetKind],
) -> Dict:
"""
Retrieve a link to authorize zanshin to read info from their target.

Mandatory for scan targets of kind:
* ZENDESK
* GWORKSPACE
* SLACK
* BITBUCKET
* JIRA
* GITLAB
* SALESFORCE
:return: a dict with the link
"""
if kind.value not in [member.value for member in OAuthTargetKind]:
raise ValueError(f"{repr(kind.value)} is not eligible for OAuth link")

scan_type = (
"scanTargetGroupId"
if kind in [ScanTargetKind.BITBUCKET, ScanTargetKind.GITLAB]
else "scanTargetId"
)

path = (
f"/oauth/link"
f"?organizationId={validate_uuid(organization_id)}"
f"&{scan_type}={validate_uuid(scan_target_id)}"
)

return self._request("GET", path).json()

def get_gworkspace_oauth_link(
self, organization_id: Union[UUID, str], scan_target_id: Union[UUID, str]
) -> Dict:
Expand Down Expand Up @@ -1278,15 +1246,18 @@ def create_scan_target_group(
"""
validate_class(kind, ScanTargetKind)
validate_class(name, str)
if kind != ScanTargetKind.ORACLE:
group_kinds = [member.value for member in ScanTargetGroupKind]

if kind not in group_kinds:
raise ValueError(
f"{repr(kind.value)} is not accepted. 'ORACLE' is expected"
f"{repr(kind.value)} is not accepted. '{group_kinds}' is expected"
)

body = {
"name": name,
"kind": kind,
}

return self._request(
"POST",
f"/organizations/{validate_uuid(organization_id)}/scantargetgroups",
Expand Down Expand Up @@ -2602,40 +2573,3 @@ def _check_boto3_installation(self):
sys.modules[package_name] = module
spec.loader.exec_module(module)
return module


def validate_int(
value, min_value=None, max_value=None, required=False
) -> Optional[int]:
if value is None:
if required:
raise ValueError("required integer parameter missing")
else:
return value
if not isinstance(value, int):
raise TypeError(f"{repr(value)} is not an integer")
if min_value and value < min_value:
raise ValueError(f"{value} shouldn't be lower than {min_value}")
if max_value and value > max_value:
raise ValueError(f"{value} shouldn't be higher than {max_value}")
return value


def validate_class(value, class_type):
if not isinstance(value, class_type):
raise TypeError(f"{repr(value)} is not an instance of {class_type.__name__}")
return value


def validate_uuid(uuid: Union[UUID, str]) -> str:
try:
if isinstance(uuid, str):
return str(UUID(uuid))

if isinstance(uuid, UUID):
return str(uuid)

raise TypeError
except (ValueError, TypeError) as ex:
ex.args = (f"{repr(uuid)} is not a valid UUID",)
raise ex
Empty file added zanshinsdk/common/__init__.py
Empty file.
Loading
Loading