Skip to content

Commit

Permalink
Add first version CachedThreadedRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
ubaumann committed Jul 17, 2024
0 parents commit ffe8041
Show file tree
Hide file tree
Showing 13 changed files with 1,026 additions and 0 deletions.
113 changes: 113 additions & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
---
name: test nornir_rich
on: [push,pull_request]

jobs:
linters:
name: linters
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v1

- name: Install Poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true

- name: Cache Poetry virtualenv
uses: actions/cache@v2
id: cached-poetry-dependencies
with:
path: .venv
key: venv-${{ runner.os }}-${{ hashFiles('**/poetry.lock') }}

- name: Install Dependencies
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction --no-root

- name: Run black
run: poetry run black
- name: Run mypy
run: poetry run mypy

pytest:
name: Testing on Python ${{ matrix.python-version }} (${{ matrix.platform}})
defaults:
run:
shell: bash
strategy:
matrix:
python-version: [ '3.8', '3.9', '3.10', '3.11']
platform: [ubuntu-latest, macOS-latest, windows-latest]
runs-on: ${{ matrix.platform }}
steps:
- uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
architecture: x64

- name: Install Poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true

- name: Cache Poetry virtualenv
uses: actions/cache@v2
id: cached-poetry-dependencies
with:
path: .venv
key: venv-${{ matrix.python-version }}-${{ runner.os }}-${{ hashFiles('**/poetry.lock') }}
if: ${{ matrix.platform != 'windows-latest' }} # windows hangs if using a cached venv

- name: Install Dependencies
run: poetry install --no-interaction --no-root

- name: Poetry show
run: poetry show

- name: Poetry env info
run: poetry env info

- name: Run pytest
run: poetry run python -m pytest -vs




release:
name: Releasing to pypi
if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags')
needs: [linters, pytest]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v1
with:
python-version: "3.10"

- name: Install Poetry
uses: snok/install-poetry@v1

- name: prepare release
run: make fiximageurls

- name: build release
run: poetry build

- name: Publish package to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}

- uses: actions/upload-artifact@v3
with:
name: build
path: dist/*
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
**/__pycache__/
.mypy_cache/
.pytest_cache/
dist/
nornir.log
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Nornir Nuts Plugin

This repository contains Nornir plugins designed for use with Nuts.

# CachedThreaded Runner

The `CachedThreadedRunner` is an extension of the threaded runner from Nornir. Task results are cached in a class variable, and if the cache contains the task result, the cached result is returned. Be aware of the limitations: significant memory consumption is possible, and the results are shared. Therefore, modifying a Result object can lead to side effects.

```bash
pip install nornir-nuts
```

```python
InitNornir(
runner={
"plugin": "cachedThreaded",
"options": {
"num_workers": 100,
},
},
inventory={
"plugin": "SimpleInventory",
"options": {
"host_file": "tests/demo_inventory/hosts.yaml",
"group_file": "tests/demo_inventory/groups.yaml",
},
},
)
```
97 changes: 97 additions & 0 deletions demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
import time
from nornir import InitNornir
from nornir.core.task import Task, Result

from random import randrange
from time import sleep


nr = InitNornir(
runner={
"plugin": "cachedThreaded",
"options": {
"num_workers": 100,
},
},
inventory={
"plugin": "SimpleInventory",
"options": {
"host_file": "tests/demo_inventory/hosts.yaml",
"group_file": "tests/demo_inventory/groups.yaml",
},
},
)


def hello_world(task: Task) -> Result:
sleep(7)
return Result(host=task.host, result=f"{task.host.name} says hello world!")


def say(task: Task, text: str) -> Result:
return Result(host=task.host, result=f"{task.host.name} says {text}")


def count(task: Task, number: int) -> Result:
count = []
for i in range(0, number):
if randrange(10) == 9:
raise Exception(f"Random exception at number {i}")
count.append(i)
return Result(host=task.host, result=f"{count}")


def greet_and_count(task: Task, number: int) -> Result:
task.run(
name="Greeting is the polite thing to do",
task=say,
text="hi!",
)

task.run(
name="Counting beans", task=count, number=number, severity_level=logging.DEBUG
)
task.run(
name="We should say bye too",
task=say,
text="bye!",
)

# let's inform if we counted even or odd times
even_or_odds = "even" if number % 2 == 1 else "odd"
return Result(host=task.host, result=f"{task.host} counted {even_or_odds} times!")


print("=" * 20, "hello_world 1st run", "=" * 20)
start = time.time()
nr.run(task=hello_world)
print(f"{time.time() - start} seconds")

print("=" * 20, "hello_world 2nd run", "=" * 20)
start = time.time()
nr.run(task=hello_world)
print(f"{time.time() - start} seconds")


print("=" * 20, "count 1st run", "=" * 20)
nr.data.reset_failed_hosts()
result = nr.run(task=count, number=10)
print(f"Hosts {result.failed_hosts.keys()} failed")
for h, r in result.failed_hosts.items():
print(f"{h}: {r[0].exception}")

print("=" * 20, "count 2nd run", "=" * 20)
nr.data.reset_failed_hosts()
result = nr.run(task=count, number=10)
print(f"Hosts {result.failed_hosts.keys()} failed")
for h, r in result.failed_hosts.items():
print(f"{h}: {r[0].exception}")

print("=" * 20, "count 3rd run", "=" * 20)
nr.data.reset_failed_hosts()
result = nr.run(task=count, number=10)
print(f"Hosts {result.failed_hosts.keys()} failed")
for h, r in result.failed_hosts.items():
print(f"{h}: {r[0].exception}")
nr.data.reset_failed_hosts()
Empty file added nornir_nuts/__init__.py
Empty file.
49 changes: 49 additions & 0 deletions nornir_nuts/runners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import inspect
from typing import List, Dict, Tuple, FrozenSet, Optional, Any
from types import ModuleType
from concurrent.futures import ThreadPoolExecutor

from nornir.core.task import AggregatedResult, Task, MultiResult
from nornir.core.inventory import Host


class CachedThreadedRunner:
"""
CachedThreadedRunner returns cached result or runs the task over each host using threads.
CachedThreadedRunner is an updated version of the Nornir ThreadedRunner
Arguments:
num_workers: number of threads to use
"""

CACHE: Dict[
Tuple[Tuple[FrozenSet[Tuple[str, Any]], bool, str, Optional[ModuleType]], str],
MultiResult,
] = dict()

def __init__(self, num_workers: int = 20) -> None:
self.num_workers = num_workers

def run(self, task: Task, hosts: List[Host]) -> AggregatedResult:
task_properties = (
frozenset(task.params.items()),
task.global_dry_run,
task.task.__name__, # Function name in code
inspect.getmodule(task.task), # module name and file
)

result = AggregatedResult(task.name)
futures = []
with ThreadPoolExecutor(self.num_workers) as pool:
for host in hosts:
if cached_result := self.CACHE.get((task_properties, host.name), None):
result[cached_result.host.name] = cached_result
continue
future = pool.submit(task.copy().start, host)
futures.append(future)

for future in futures:
worker_result = future.result()
self.CACHE[(task_properties, worker_result.host.name)] = worker_result
result[worker_result.host.name] = worker_result
return result
Loading

0 comments on commit ffe8041

Please sign in to comment.