Skip to content
This repository has been archived by the owner on Jul 29, 2024. It is now read-only.

Commit

Permalink
introduce result/option types
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-stokes committed Apr 11, 2022
1 parent 3c876a4 commit 3c4266d
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 154 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[flake8]
max-line-length=90
max-line-length=120
4 changes: 3 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ disable=invalid-name,
no-value-for-parameter,
duplicate-code,
arguments-differ,
abstract-method
abstract-method,
logging-not-lazy

logging-format-style=fstr
16 changes: 3 additions & 13 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"label": "fix",
"type": "shell",
"command": "poetry run inv fix",
"command": "poetry run poe fix",
"problemMatcher": [],
"group": {
"kind": "build",
Expand All @@ -26,7 +26,7 @@
{
"label": "test",
"type": "shell",
"command": "poetry run inv test",
"command": "poetry run poe test",
"problemMatcher": [],
"group": {
"kind": "test",
Expand All @@ -42,17 +42,7 @@
"kind": "build",
"isDefault": true
}
},
{
"label": "celery-prod",
"type": "shell",
"command": "poetry run ogc server",
"problemMatcher": [],
"group": {
"kind": "build",
"isDefault": true
}
}

]
}
4 changes: 2 additions & 2 deletions ogc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ layouts:
username: ubuntu
scripts: fixtures/ex_deploy_ubuntu
provider: aws
scale: 0
scale: 1
remote-path: /home/ubuntu/ogc
exclude:
- .git
Expand All @@ -33,7 +33,7 @@ layouts:
username: ogc
scripts: fixtures/ex_deploy_windows
provider: aws
scale: 1
scale: 0
remote-path: ogc-src
exclude:
- .git
Expand Down
125 changes: 70 additions & 55 deletions ogc/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from functools import partial
from multiprocessing import cpu_count
from pathlib import Path
from typing import Any, Dict
from typing import Any, Iterator

import sh
from pampy import match
from safetywrap import Err, Ok, Result
from toolz.functoolz import thread_last

from ogc import db, enums, state
from ogc import db, enums, spec, state
from ogc.deployer import Deployer
from ogc.log import Logger as log
from ogc.provision import choose_provisioner
Expand All @@ -20,7 +23,7 @@
MAX_WORKERS = int(os.environ.get("OGC_MAX_WORKERS", cpu_count() - 1))


def launch(layout) -> int:
def launch(layout: dict[str, str]) -> Result[int, str]:
"""Launch a node.
Synchronous function for launching a node in a cloud environment.
Expand All @@ -31,28 +34,30 @@ def launch(layout) -> int:
from ogc import actions
app.spec = SpecLoader.load(["/Users/adam/specs/ogc.yml"])
node_ids_created = [actions.launch(layout.as_dict()) for layout in app.spec.layouts]
node_ids_created = [actions.launch(layout.as_dict())
for layout in app.spec.layouts]
Args:
layout (ogc.spec.SpecProvisionLayout): The layout specification used
layout (dict[str, str]): The layout specification used
when launching a node.
Returns:
id (int): The database row ID of the node that was deployed.
Result[int, str]: DB Row ID if successful, Error otherwise.
"""
try:
log.info(f"Provisioning: {layout['name']}")
engine = choose_provisioner(layout["provider"], env=state.app.env)
engine.setup(layout)
model = engine.create(layout=layout, env=state.app.env)
log.info(f"Saved {model.instance_name} to database")
return model.id
return Ok(int(model.id))
except Exception as e:
log.error(e)
return -1
return Err(f"Failed to launch node: {e}")


def launch_async(layouts) -> list[int]:
def launch_async(
layouts: list[spec.SpecProvisionLayout],
) -> Iterator[int]:
"""Launch a node asynchronously.
Asynchronous function for launching a node in a cloud environment.
Expand All @@ -70,18 +75,18 @@ def launch_async(layouts) -> list[int]:
when launching a node.
Returns:
ids (list[int]): The database row ID's of the node(s) that were deployed.
ids (Iterator[int]): The database row ID's of the node(s) that were deployed.
"""

with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
results = executor.map(
launch,
[layout.as_dict() for layout in layouts for _ in range(layout.scale)],
)
return results
return (result.unwrap() for result in results)


def deploy(node_id: int) -> bool:
def deploy(node_id: int) -> Result[bool, str]:
"""Execute the deployment
Function for executing the deployment on a node.
Expand All @@ -100,28 +105,26 @@ def deploy(node_id: int) -> bool:
node (int): The node ID from the launch
Returns:
bool: True if successful, False otherwise.
Result[bool, str]: `Result` with True if successful, False otherwise.
"""

try:
with state.app.session as session:
node_obj = (
session.query(db.Node).filter(db.Node.id == node_id).first() or None
)
if node_obj:
log.info(f"Deploying to: {node_obj.instance_name}")
result = Deployer(node_obj, state.app.env).run()
result.show()
else:
log.warning(f"Could not find Node (id: {node_id})")
return False
if not node_obj:
return Err("Failed to query node")
log.info(f"Deploying to: {node_obj.instance_name}")
result = Deployer(node_obj, state.app.env).run()
result.show()
except Exception as e:
log.error(e)
return False
return True
return Err(str(e))
return Ok(True)


def deploy_async(nodes: list[int]) -> list[bool]:
def deploy_async(nodes: Iterator[int]) -> Result[Iterator[bool], str]:
"""Execute the deployment
Asynchronous function for executing the deployment on a node.
Expand All @@ -136,21 +139,24 @@ def deploy_async(nodes: list[int]) -> list[bool]:
script_deploy_results = actions.deploy_async(node_ids)
Args:
nodes (list[int]): The node id's from the launch
nodes (Iterator[Result[int, Error]]): The `Result` from the launch
Returns:
list[bool]: A list of booleans from result of deployment.
Result[Iterator[bool], Error]: A `Result` containing Iterator of
booleans on success, Failure otherwise.
"""
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
results = executor.map(deploy, [node for node in nodes if node > 0])
return results
if any(result for result in results if result.is_ok()):
return Err("Some nodes failed to deploy scripts")
return Ok((result.unwrap() for result in results if result.is_ok()))


def teardown(
name: str,
force: bool = False,
only_db: bool = False,
) -> bool:
) -> Result[bool, str]:
"""Teardown deployment
Function for tearing down a node.
Expand All @@ -170,7 +176,6 @@ def teardown(
Returns:
bool: True if teardown is successful, False otherwise.
"""

result = True
with state.app.session as session:
node_data = (
Expand Down Expand Up @@ -207,12 +212,12 @@ def teardown(
log.warning(f"Couldn't destroy {node_data.instance_name}")
session.delete(node_data)
session.commit()
return result
return Ok(result) if result else Err("Failed to teardown node")


def teardown_async(
names: list[str], force: bool = False, only_db: bool = False
) -> list[bool]:
) -> Result[Iterator[bool], str]:
"""Teardown deployment
Async function for tearing down a node.
Expand All @@ -230,18 +235,20 @@ def teardown_async(
caution.
Returns:
list[bool]: True if teardown is successful, False otherwise.
Result[Iterator[bool], Error]: `Result` of Iterator[bool] if successfull, Error otherwise.
"""

if not isinstance(names, list):
names = list(names)
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
func = partial(teardown, only_db=only_db, force=force)
results = executor.map(func, names)
return results
if any(result for result in results if result.is_err()):
return Err("Failed to teardown node")
return Ok((result.unwrap() for result in results if result.is_ok()))


def sync(layout, overrides: Dict[Any, Any]) -> bool:
def sync(layout, overrides: dict[str, str]) -> Result[bool, str]:
"""Sync a deployment
Function for syncing a deployment to correct scale.
Expand All @@ -255,34 +262,40 @@ def sync(layout, overrides: Dict[Any, Any]) -> bool:
Args:
layout (ogc.spec.SpecProvisionLayout): The layout of the deployment
overrides (dict): Override dictionary of what the new count of nodes should be
overrides (dict[str, str]): Override dictionary of what the new count of nodes should be
Returns:
bool: True if synced, False otherwise
Result[bool, str]: True if synced, False otherwise
"""

log.info(f"Starting deployment sync for {layout.name}...")
result = False
override = overrides[layout.name]
if override["action"] == "add":
node_id = launch(layout.as_dict())
result = deploy(node_id)
if not result:
log.error("Could not deploy node")

elif override["action"] == "remove":
def _sync_add() -> Result[bool, str]:
log.info(f"Adding deployments for {layout.name}")
return thread_last(layout.as_dict(), launch, lambda x: x.unwrap(), deploy)

def _sync_remove() -> Result[bool, str]:
log.info(f"Removing deployments for {layout.name}")
with state.app.session as session:
for data in (
data = (
session.query(db.Node)
.filter(db.Node.instance_name.endswith(layout.name))
.order_by(db.Node.id.desc())
.limit(1)
):
result = teardown(data.instance_name, force=True)
return result
)
func = partial(teardown, force=True)
return thread_last(data, lambda x: x.first().instance_name, func)

return match(
override,
{"action": "add"},
lambda x: _sync_add(),
{"action": "remove"},
lambda x: _sync_remove(),
)

def sync_async(layouts, overrides: Dict[Any, Any]) -> list[bool]:

def sync_async(layouts, overrides: dict[Any, Any]) -> Result[Iterator[bool], str]:
"""Sync a deployment
Async function for syncing a deployment to correct scale.
Expand All @@ -299,7 +312,7 @@ def sync_async(layouts, overrides: Dict[Any, Any]) -> list[bool]:
overrides (dict): Override dictionary of what the new count of nodes should be
Returns:
list[bool]: True if synced, False otherwise
Result[Iterator[bool], Error]: True if synced, False otherwise
"""

with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
Expand All @@ -312,7 +325,9 @@ def sync_async(layouts, overrides: Dict[Any, Any]) -> list[bool]:
for _ in range(abs(overrides[layout.name]["remaining"]))
],
)
return results
if any(result for result in results if result.is_err()):
return Err("Failed to teardown nodes")
return Ok((result.unwrap() for result in results if result.is_ok()))


def exec(node: db.Node, cmd: str) -> bool:
Expand Down Expand Up @@ -361,10 +376,10 @@ def exec(node: db.Node, cmd: str) -> bool:
)
session.add(result)
session.commit()
return result.exit_code == 0
return bool(result.exit_code == 0)


def exec_async(name: str, tag: str, cmd: str) -> list[bool]:
def exec_async(name: str, tag: str, cmd: str) -> Iterator[bool]:
"""Execute command on Nodes
Async function for executing a command on a node.
Expand Down Expand Up @@ -434,7 +449,7 @@ def exec_scripts(node: db.Node, path: str) -> bool:
return result.passed


def exec_scripts_async(name: str, tag: str, path: str) -> list[bool]:
def exec_scripts_async(name: str, tag: str, path: str) -> Iterator[bool]:
"""Execute a scripts/template directory on a Node
Async function for executing scripts/templates on a node.
Expand Down Expand Up @@ -463,7 +478,7 @@ def exec_scripts_async(name: str, tag: str, path: str) -> list[bool]:
else:
rows = session.query(db.Node).all()

log.info(f"Executing scripts from '{path}' across {rows.count()} nodes.")
log.info("Executing scripts from '%s' across {%s} nodes." % path, rows.count())
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
func = partial(exec_scripts, path=path)
results = executor.map(
Expand Down
Loading

0 comments on commit 3c4266d

Please sign in to comment.