diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml index f7b5c2e..8a6ead2 100644 --- a/.github/workflows/build-and-test.yaml +++ b/.github/workflows/build-and-test.yaml @@ -42,6 +42,11 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 # v3 + - uses: actions/setup-python@v4 + with: + python-version: '3.11' + - name: Install dependencies + run: python -m pip install tox - name: Set channel run: | juju_channel=$(echo "${{ matrix.agent-versions }}" | cut -c 1-3) diff --git a/.github/workflows/cla_check.yaml b/.github/workflows/cla_check.yaml new file mode 100644 index 0000000..ec8abbf --- /dev/null +++ b/.github/workflows/cla_check.yaml @@ -0,0 +1,12 @@ +name: CLA check + +on: + pull_request: + branches: [main] + +jobs: + cla-check: + runs-on: ubuntu-22.04 + steps: + - name: Check if Canonical's Contributor License Agreement has been signed + uses: canonical/has-signed-canonical-cla@v1 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f0f059a..9641029 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -5,7 +5,7 @@ This documents explains the processes and practices recommended for contributing enhancements to this operator. -- Generally, before developing enhancements to this charm, you should consider [opening an issue +- Generally, before developing enhancements to this charm, consider [opening an issue ](https://github.com/canonical/nats-operator/issues) explaining your use case. - Familiarising yourself with the [Charmed Operator Framework](https://juju.is/docs/sdk) library will help you a lot when working on new features or bug fixes. @@ -13,11 +13,18 @@ this operator. - code quality - test coverage - user experience for Juju administrators of this charm. -- Please help us out in ensuring easy to review branches by rebasing your pull request branch onto - the `master` branch. This also avoids merge commits and creates a linear Git commit history. +- It is a good practice to rebase your pull request branch onto the `main` branch for a linear +commit history, avoiding merge commits and easy reviews. ## Developing +### Prerequisites + +To run integration tests you require a juju deployment with a juju controller ready. You can refer to +[how to setup a juju deployment](https://juju.is/docs/juju/get-started-with-juju). + + +### Develop You can create an environment for development with `tox`: ```shell @@ -25,7 +32,7 @@ tox devenv -e integration-juju3 source venv/bin/activate ``` -### Testing +### Test ```shell tox run -e format # update your code according to linting rules @@ -36,7 +43,7 @@ tox run -e integration-juju3 # integration tests for juju 3.2 tox # runs 'lint' and 'unit' environments ``` -## Build charm +### Build Build the charm in this git repository using: @@ -60,5 +67,5 @@ juju deploy ./nats_ubuntu-22.04-amd64.charm ## Canonical Contributor Agreement Canonical welcomes contributions to the NATS Operator. Please check out our -[contributor agreement](https://ubuntu.com/legal/contributors)if you're +[contributor agreement](https://ubuntu.com/legal/contributors) if you're interested in contributing to the solution. diff --git a/charmcraft.yaml b/charmcraft.yaml index 0b46f60..b3567ff 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -3,6 +3,8 @@ bases: - build-on: - name: "ubuntu" channel: "20.04" + - name: "ubuntu" + channel: "22.04" run-on: - name: "ubuntu" channel: "20.04" diff --git a/lib/charms/operator_libs_linux/v2/snap.py b/lib/charms/operator_libs_linux/v2/snap.py new file mode 100644 index 0000000..38c88cf --- /dev/null +++ b/lib/charms/operator_libs_linux/v2/snap.py @@ -0,0 +1,1099 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Representations of the system's Snaps, and abstractions around managing them. + +The `snap` module provides convenience methods for listing, installing, refreshing, and removing +Snap packages, in addition to setting and getting configuration options for them. + +In the `snap` module, `SnapCache` creates a dict-like mapping of `Snap` objects at when +instantiated. Installed snaps are fully populated, and available snaps are lazily-loaded upon +request. This module relies on an installed and running `snapd` daemon to perform operations over +the `snapd` HTTP API. + +`SnapCache` objects can be used to install or modify Snap packages by name in a manner similar to +using the `snap` command from the commandline. + +An example of adding Juju to the system with `SnapCache` and setting a config value: + +```python +try: + cache = snap.SnapCache() + juju = cache["juju"] + + if not juju.present: + juju.ensure(snap.SnapState.Latest, channel="beta") + juju.set({"some.key": "value", "some.key2": "value2"}) +except snap.SnapError as e: + logger.error("An exception occurred when installing charmcraft. Reason: %s", e.message) +``` + +In addition, the `snap` module provides "bare" methods which can act on Snap packages as +simple function calls. :meth:`add`, :meth:`remove`, and :meth:`ensure` are provided, as +well as :meth:`add_local` for installing directly from a local `.snap` file. These return +`Snap` objects. + +As an example of installing several Snaps and checking details: + +```python +try: + nextcloud, charmcraft = snap.add(["nextcloud", "charmcraft"]) + if nextcloud.get("mode") != "production": + nextcloud.set({"mode": "production"}) +except snap.SnapError as e: + logger.error("An exception occurred when installing snaps. Reason: %s" % e.message) +``` +""" + +import http.client +import json +import logging +import os +import re +import socket +import subprocess +import sys +import urllib.error +import urllib.parse +import urllib.request +from collections.abc import Mapping +from datetime import datetime, timedelta, timezone +from enum import Enum +from subprocess import CalledProcessError, CompletedProcess +from typing import Any, Dict, Iterable, List, Optional, Union + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "05394e5893f94f2d90feb7cbe6b633cd" + +# Increment this major API version when introducing breaking changes +LIBAPI = 2 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 3 + + +# Regex to locate 7-bit C1 ANSI sequences +ansi_filter = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + + +def _cache_init(func): + def inner(*args, **kwargs): + if _Cache.cache is None: + _Cache.cache = SnapCache() + return func(*args, **kwargs) + + return inner + + +# recursive hints seems to error out pytest +JSONType = Union[Dict[str, Any], List[Any], str, int, float] + + +class SnapService: + """Data wrapper for snap services.""" + + def __init__( + self, + daemon: Optional[str] = None, + daemon_scope: Optional[str] = None, + enabled: bool = False, + active: bool = False, + activators: List[str] = [], + **kwargs, + ): + self.daemon = daemon + self.daemon_scope = kwargs.get("daemon-scope", None) or daemon_scope + self.enabled = enabled + self.active = active + self.activators = activators + + def as_dict(self) -> Dict: + """Return instance representation as dict.""" + return { + "daemon": self.daemon, + "daemon_scope": self.daemon_scope, + "enabled": self.enabled, + "active": self.active, + "activators": self.activators, + } + + +class MetaCache(type): + """MetaCache class used for initialising the snap cache.""" + + @property + def cache(cls) -> "SnapCache": + """Property for returning the snap cache.""" + return cls._cache + + @cache.setter + def cache(cls, cache: "SnapCache") -> None: + """Setter for the snap cache.""" + cls._cache = cache + + def __getitem__(cls, name) -> "Snap": + """Snap cache getter.""" + return cls._cache[name] + + +class _Cache(object, metaclass=MetaCache): + _cache = None + + +class Error(Exception): + """Base class of most errors raised by this library.""" + + def __repr__(self): + """Represent the Error class.""" + return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args) + + @property + def name(self): + """Return a string representation of the model plus class.""" + return "<{}.{}>".format(type(self).__module__, type(self).__name__) + + @property + def message(self): + """Return the message passed as an argument.""" + return self.args[0] + + +class SnapAPIError(Error): + """Raised when an HTTP API error occurs talking to the Snapd server.""" + + def __init__(self, body: Dict, code: int, status: str, message: str): + super().__init__(message) # Makes str(e) return message + self.body = body + self.code = code + self.status = status + self._message = message + + def __repr__(self): + """Represent the SnapAPIError class.""" + return "APIError({!r}, {!r}, {!r}, {!r})".format( + self.body, self.code, self.status, self._message + ) + + +class SnapState(Enum): + """The state of a snap on the system or in the cache.""" + + Present = "present" + Absent = "absent" + Latest = "latest" + Available = "available" + + +class SnapError(Error): + """Raised when there's an error running snap control commands.""" + + +class SnapNotFoundError(Error): + """Raised when a requested snap is not known to the system.""" + + +class Snap(object): + """Represents a snap package and its properties. + + `Snap` exposes the following properties about a snap: + - name: the name of the snap + - state: a `SnapState` representation of its install status + - channel: "stable", "candidate", "beta", and "edge" are common + - revision: a string representing the snap's revision + - confinement: "classic" or "strict" + """ + + def __init__( + self, + name, + state: SnapState, + channel: str, + revision: str, + confinement: str, + apps: Optional[List[Dict[str, str]]] = None, + cohort: Optional[str] = "", + ) -> None: + self._name = name + self._state = state + self._channel = channel + self._revision = revision + self._confinement = confinement + self._cohort = cohort + self._apps = apps or [] + self._snap_client = SnapClient() + + def __eq__(self, other) -> bool: + """Equality for comparison.""" + return isinstance(other, self.__class__) and ( + self._name, + self._revision, + ) == (other._name, other._revision) + + def __hash__(self): + """Calculate a hash for this snap.""" + return hash((self._name, self._revision)) + + def __repr__(self): + """Represent the object such that it can be reconstructed.""" + return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__) + + def __str__(self): + """Represent the snap object as a string.""" + return "<{}: {}-{}.{} -- {}>".format( + self.__class__.__name__, + self._name, + self._revision, + self._channel, + str(self._state), + ) + + def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str: + """Perform a snap operation. + + Args: + command: the snap command to execute + optargs: an (optional) list of additional arguments to pass, + commonly confinement or channel + + Raises: + SnapError if there is a problem encountered + """ + optargs = optargs or [] + args = ["snap", command, self._name, *optargs] + try: + return subprocess.check_output(args, universal_newlines=True) + except CalledProcessError as e: + raise SnapError( + "Snap: {!r}; command {!r} failed with output = {!r}".format( + self._name, args, e.output + ) + ) + + def _snap_daemons( + self, + command: List[str], + services: Optional[List[str]] = None, + ) -> CompletedProcess: + """Perform snap app commands. + + Args: + command: the snap command to execute + services: the snap service to execute command on + + Raises: + SnapError if there is a problem encountered + """ + if services: + # an attempt to keep the command constrained to the snap instance's services + services = ["{}.{}".format(self._name, service) for service in services] + else: + services = [self._name] + + args = ["snap", *command, *services] + + try: + return subprocess.run(args, universal_newlines=True, check=True, capture_output=True) + except CalledProcessError as e: + raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) + + def get(self, key: Optional[str], *, typed: bool = False) -> Any: + """Fetch snap configuration values. + + Args: + key: the key to retrieve. Default to retrieve all values for typed=True. + typed: set to True to retrieve typed values (set with typed=True). + Default is to return a string. + """ + if typed: + config = json.loads(self._snap("get", ["-d", key])) + if key: + return config.get(key) + return config + + if not key: + raise TypeError("Key must be provided when typed=False") + + return self._snap("get", [key]).strip() + + def set(self, config: Dict[str, Any], *, typed: bool = False) -> str: + """Set a snap configuration value. + + Args: + config: a dictionary containing keys and values specifying the config to set. + typed: set to True to convert all values in the config into typed values while + configuring the snap (set with typed=True). Default is not to convert. + """ + if typed: + kv = [f"{key}={json.dumps(val)}" for key, val in config.items()] + return self._snap("set", ["-t"] + kv) + + return self._snap("set", [f"{key}={val}" for key, val in config.items()]) + + def unset(self, key) -> str: + """Unset a snap configuration value. + + Args: + key: the key to unset + """ + return self._snap("unset", [key]) + + def start(self, services: Optional[List[str]] = None, enable: Optional[bool] = False) -> None: + """Start a snap's services. + + Args: + services (list): (optional) list of individual snap services to start (otherwise all) + enable (bool): (optional) flag to enable snap services on start. Default `false` + """ + args = ["start", "--enable"] if enable else ["start"] + self._snap_daemons(args, services) + + def stop(self, services: Optional[List[str]] = None, disable: Optional[bool] = False) -> None: + """Stop a snap's services. + + Args: + services (list): (optional) list of individual snap services to stop (otherwise all) + disable (bool): (optional) flag to disable snap services on stop. Default `False` + """ + args = ["stop", "--disable"] if disable else ["stop"] + self._snap_daemons(args, services) + + def logs(self, services: Optional[List[str]] = None, num_lines: Optional[int] = 10) -> str: + """Fetch a snap services' logs. + + Args: + services (list): (optional) list of individual snap services to show logs from + (otherwise all) + num_lines (int): (optional) integer number of log lines to return. Default `10` + """ + args = ["logs", "-n={}".format(num_lines)] if num_lines else ["logs"] + return self._snap_daemons(args, services).stdout + + def connect( + self, plug: str, service: Optional[str] = None, slot: Optional[str] = None + ) -> None: + """Connect a plug to a slot. + + Args: + plug (str): the plug to connect + service (str): (optional) the snap service name to plug into + slot (str): (optional) the snap service slot to plug in to + + Raises: + SnapError if there is a problem encountered + """ + command = ["connect", "{}:{}".format(self._name, plug)] + + if service and slot: + command = command + ["{}:{}".format(service, slot)] + elif slot: + command = command + [slot] + + args = ["snap", *command] + try: + subprocess.run(args, universal_newlines=True, check=True, capture_output=True) + except CalledProcessError as e: + raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) + + def hold(self, duration: Optional[timedelta] = None) -> None: + """Add a refresh hold to a snap. + + Args: + duration: duration for the hold, or None (the default) to hold this snap indefinitely. + """ + hold_str = "forever" + if duration is not None: + seconds = round(duration.total_seconds()) + hold_str = f"{seconds}s" + self._snap("refresh", [f"--hold={hold_str}"]) + + def unhold(self) -> None: + """Remove the refresh hold of a snap.""" + self._snap("refresh", ["--unhold"]) + + def alias(self, application: str, alias: Optional[str] = None) -> None: + """Create an alias for a given application. + + Args: + application: application to get an alias. + alias: (optional) name of the alias; if not provided, the application name is used. + """ + if alias is None: + alias = application + args = ["snap", "alias", f"{self.name}.{application}", alias] + try: + subprocess.check_output(args, universal_newlines=True) + except CalledProcessError as e: + raise SnapError( + "Snap: {!r}; command {!r} failed with output = {!r}".format( + self._name, args, e.output + ) + ) + + def restart( + self, services: Optional[List[str]] = None, reload: Optional[bool] = False + ) -> None: + """Restarts a snap's services. + + Args: + services (list): (optional) list of individual snap services to restart. + (otherwise all) + reload (bool): (optional) flag to use the service reload command, if available. + Default `False` + """ + args = ["restart", "--reload"] if reload else ["restart"] + self._snap_daemons(args, services) + + def _install( + self, + channel: Optional[str] = "", + cohort: Optional[str] = "", + revision: Optional[str] = None, + ) -> None: + """Add a snap to the system. + + Args: + channel: the channel to install from + cohort: optional, the key of a cohort that this snap belongs to + revision: optional, the revision of the snap to install + """ + cohort = cohort or self._cohort + + args = [] + if self.confinement == "classic": + args.append("--classic") + if channel: + args.append('--channel="{}"'.format(channel)) + if revision: + args.append('--revision="{}"'.format(revision)) + if cohort: + args.append('--cohort="{}"'.format(cohort)) + + self._snap("install", args) + + def _refresh( + self, + channel: Optional[str] = "", + cohort: Optional[str] = "", + revision: Optional[str] = None, + leave_cohort: Optional[bool] = False, + ) -> None: + """Refresh a snap. + + Args: + channel: the channel to install from + cohort: optionally, specify a cohort. + revision: optionally, specify the revision of the snap to refresh + leave_cohort: leave the current cohort. + """ + args = [] + if channel: + args.append('--channel="{}"'.format(channel)) + + if revision: + args.append('--revision="{}"'.format(revision)) + + if not cohort: + cohort = self._cohort + + if leave_cohort: + self._cohort = "" + args.append("--leave-cohort") + elif cohort: + args.append('--cohort="{}"'.format(cohort)) + + self._snap("refresh", args) + + def _remove(self) -> str: + """Remove a snap from the system.""" + return self._snap("remove") + + @property + def name(self) -> str: + """Returns the name of the snap.""" + return self._name + + def ensure( + self, + state: SnapState, + classic: Optional[bool] = False, + channel: Optional[str] = "", + cohort: Optional[str] = "", + revision: Optional[str] = None, + ): + """Ensure that a snap is in a given state. + + Args: + state: a `SnapState` to reconcile to. + classic: an (Optional) boolean indicating whether classic confinement should be used + channel: the channel to install from + cohort: optional. Specify the key of a snap cohort. + revision: optional. the revision of the snap to install/refresh + + While both channel and revision could be specified, the underlying snap install/refresh + command will determine which one takes precedence (revision at this time) + + Raises: + SnapError if an error is encountered + """ + self._confinement = "classic" if classic or self._confinement == "classic" else "" + + if state not in (SnapState.Present, SnapState.Latest): + # We are attempting to remove this snap. + if self._state in (SnapState.Present, SnapState.Latest): + # The snap is installed, so we run _remove. + self._remove() + else: + # The snap is not installed -- no need to do anything. + pass + else: + # We are installing or refreshing a snap. + if self._state not in (SnapState.Present, SnapState.Latest): + # The snap is not installed, so we install it. + self._install(channel, cohort, revision) + else: + # The snap is installed, but we are changing it (e.g., switching channels). + self._refresh(channel, cohort, revision) + + self._update_snap_apps() + self._state = state + + def _update_snap_apps(self) -> None: + """Update a snap's apps after snap changes state.""" + try: + self._apps = self._snap_client.get_installed_snap_apps(self._name) + except SnapAPIError: + logger.debug("Unable to retrieve snap apps for {}".format(self._name)) + self._apps = [] + + @property + def present(self) -> bool: + """Report whether or not a snap is present.""" + return self._state in (SnapState.Present, SnapState.Latest) + + @property + def latest(self) -> bool: + """Report whether the snap is the most recent version.""" + return self._state is SnapState.Latest + + @property + def state(self) -> SnapState: + """Report the current snap state.""" + return self._state + + @state.setter + def state(self, state: SnapState) -> None: + """Set the snap state to a given value. + + Args: + state: a `SnapState` to reconcile the snap to. + + Raises: + SnapError if an error is encountered + """ + if self._state is not state: + self.ensure(state) + self._state = state + + @property + def revision(self) -> str: + """Returns the revision for a snap.""" + return self._revision + + @property + def channel(self) -> str: + """Returns the channel for a snap.""" + return self._channel + + @property + def confinement(self) -> str: + """Returns the confinement for a snap.""" + return self._confinement + + @property + def apps(self) -> List: + """Returns (if any) the installed apps of the snap.""" + self._update_snap_apps() + return self._apps + + @property + def services(self) -> Dict: + """Returns (if any) the installed services of the snap.""" + self._update_snap_apps() + services = {} + for app in self._apps: + if "daemon" in app: + services[app["name"]] = SnapService(**app).as_dict() + + return services + + @property + def held(self) -> bool: + """Report whether the snap has a hold.""" + info = self._snap("info") + return "hold:" in info + + +class _UnixSocketConnection(http.client.HTTPConnection): + """Implementation of HTTPConnection that connects to a named Unix socket.""" + + def __init__(self, host, timeout=None, socket_path=None): + if timeout is None: + super().__init__(host) + else: + super().__init__(host, timeout=timeout) + self.socket_path = socket_path + + def connect(self): + """Override connect to use Unix socket (instead of TCP socket).""" + if not hasattr(socket, "AF_UNIX"): + raise NotImplementedError("Unix sockets not supported on {}".format(sys.platform)) + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(self.socket_path) + if self.timeout is not None: + self.sock.settimeout(self.timeout) + + +class _UnixSocketHandler(urllib.request.AbstractHTTPHandler): + """Implementation of HTTPHandler that uses a named Unix socket.""" + + def __init__(self, socket_path: str): + super().__init__() + self.socket_path = socket_path + + def http_open(self, req) -> http.client.HTTPResponse: + """Override http_open to use a Unix socket connection (instead of TCP).""" + return self.do_open(_UnixSocketConnection, req, socket_path=self.socket_path) + + +class SnapClient: + """Snapd API client to talk to HTTP over UNIX sockets. + + In order to avoid shelling out and/or involving sudo in calling the snapd API, + use a wrapper based on the Pebble Client, trimmed down to only the utility methods + needed for talking to snapd. + """ + + def __init__( + self, + socket_path: str = "/run/snapd.socket", + opener: Optional[urllib.request.OpenerDirector] = None, + base_url: str = "http://localhost/v2/", + timeout: float = 30.0, + ): + """Initialize a client instance. + + Args: + socket_path: a path to the socket on the filesystem. Defaults to /run/snap/snapd.socket + opener: specifies an opener for unix socket, if unspecified a default is used + base_url: base url for making requests to the snap client. Defaults to + http://localhost/v2/ + timeout: timeout in seconds to use when making requests to the API. Default is 30.0s. + """ + if opener is None: + opener = self._get_default_opener(socket_path) + self.opener = opener + self.base_url = base_url + self.timeout = timeout + + @classmethod + def _get_default_opener(cls, socket_path): + """Build the default opener to use for requests (HTTP over Unix socket).""" + opener = urllib.request.OpenerDirector() + opener.add_handler(_UnixSocketHandler(socket_path)) + opener.add_handler(urllib.request.HTTPDefaultErrorHandler()) + opener.add_handler(urllib.request.HTTPRedirectHandler()) + opener.add_handler(urllib.request.HTTPErrorProcessor()) + return opener + + def _request( + self, + method: str, + path: str, + query: Dict = None, + body: Dict = None, + ) -> JSONType: + """Make a JSON request to the Snapd server with the given HTTP method and path. + + If query dict is provided, it is encoded and appended as a query string + to the URL. If body dict is provided, it is serialied as JSON and used + as the HTTP body (with Content-Type: "application/json"). The resulting + body is decoded from JSON. + """ + headers = {"Accept": "application/json"} + data = None + if body is not None: + data = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + + response = self._request_raw(method, path, query, headers, data) + return json.loads(response.read().decode())["result"] + + def _request_raw( + self, + method: str, + path: str, + query: Dict = None, + headers: Dict = None, + data: bytes = None, + ) -> http.client.HTTPResponse: + """Make a request to the Snapd server; return the raw HTTPResponse object.""" + url = self.base_url + path + if query: + url = url + "?" + urllib.parse.urlencode(query) + + if headers is None: + headers = {} + request = urllib.request.Request(url, method=method, data=data, headers=headers) + + try: + response = self.opener.open(request, timeout=self.timeout) + except urllib.error.HTTPError as e: + code = e.code + status = e.reason + message = "" + try: + body = json.loads(e.read().decode())["result"] + except (IOError, ValueError, KeyError) as e2: + # Will only happen on read error or if Pebble sends invalid JSON. + body = {} + message = "{} - {}".format(type(e2).__name__, e2) + raise SnapAPIError(body, code, status, message) + except urllib.error.URLError as e: + raise SnapAPIError({}, 500, "Not found", e.reason) + return response + + def get_installed_snaps(self) -> Dict: + """Get information about currently installed snaps.""" + return self._request("GET", "snaps") + + def get_snap_information(self, name: str) -> Dict: + """Query the snap server for information about single snap.""" + return self._request("GET", "find", {"name": name})[0] + + def get_installed_snap_apps(self, name: str) -> List: + """Query the snap server for apps belonging to a named, currently installed snap.""" + return self._request("GET", "apps", {"names": name, "select": "service"}) + + +class SnapCache(Mapping): + """An abstraction to represent installed/available packages. + + When instantiated, `SnapCache` iterates through the list of installed + snaps using the `snapd` HTTP API, and a list of available snaps by reading + the filesystem to populate the cache. Information about available snaps is lazily-loaded + from the `snapd` API when requested. + """ + + def __init__(self): + if not self.snapd_installed: + raise SnapError("snapd is not installed or not in /usr/bin") from None + self._snap_client = SnapClient() + self._snap_map = {} + if self.snapd_installed: + self._load_available_snaps() + self._load_installed_snaps() + + def __contains__(self, key: str) -> bool: + """Check if a given snap is in the cache.""" + return key in self._snap_map + + def __len__(self) -> int: + """Report number of items in the snap cache.""" + return len(self._snap_map) + + def __iter__(self) -> Iterable["Snap"]: + """Provide iterator for the snap cache.""" + return iter(self._snap_map.values()) + + def __getitem__(self, snap_name: str) -> Snap: + """Return either the installed version or latest version for a given snap.""" + snap = self._snap_map.get(snap_name, None) + if snap is None: + # The snapd cache file may not have existed when _snap_map was + # populated. This is normal. + try: + self._snap_map[snap_name] = self._load_info(snap_name) + except SnapAPIError: + raise SnapNotFoundError("Snap '{}' not found!".format(snap_name)) + + return self._snap_map[snap_name] + + @property + def snapd_installed(self) -> bool: + """Check whether snapd has been installled on the system.""" + return os.path.isfile("/usr/bin/snap") + + def _load_available_snaps(self) -> None: + """Load the list of available snaps from disk. + + Leave them empty and lazily load later if asked for. + """ + if not os.path.isfile("/var/cache/snapd/names"): + # The snap catalog may not be populated yet; this is normal. + # snapd updates the cache infrequently and the cache file may not + # currently exist. + return + + with open("/var/cache/snapd/names", "r") as f: + for line in f: + if line.strip(): + self._snap_map[line.strip()] = None + + def _load_installed_snaps(self) -> None: + """Load the installed snaps into the dict.""" + installed = self._snap_client.get_installed_snaps() + + for i in installed: + snap = Snap( + name=i["name"], + state=SnapState.Latest, + channel=i["channel"], + revision=i["revision"], + confinement=i["confinement"], + apps=i.get("apps", None), + ) + self._snap_map[snap.name] = snap + + def _load_info(self, name) -> Snap: + """Load info for snaps which are not installed if requested. + + Args: + name: a string representing the name of the snap + """ + info = self._snap_client.get_snap_information(name) + + return Snap( + name=info["name"], + state=SnapState.Available, + channel=info["channel"], + revision=info["revision"], + confinement=info["confinement"], + apps=None, + ) + + +@_cache_init +def add( + snap_names: Union[str, List[str]], + state: Union[str, SnapState] = SnapState.Latest, + channel: Optional[str] = "", + classic: Optional[bool] = False, + cohort: Optional[str] = "", + revision: Optional[str] = None, +) -> Union[Snap, List[Snap]]: + """Add a snap to the system. + + Args: + snap_names: the name or names of the snaps to install + state: a string or `SnapState` representation of the desired state, one of + [`Present` or `Latest`] + channel: an (Optional) channel as a string. Defaults to 'latest' + classic: an (Optional) boolean specifying whether it should be added with classic + confinement. Default `False` + cohort: an (Optional) string specifying the snap cohort to use + revision: an (Optional) string specifying the snap revision to use + + Raises: + SnapError if some snaps failed to install or were not found. + """ + if not channel and not revision: + channel = "latest" + + snap_names = [snap_names] if isinstance(snap_names, str) else snap_names + if not snap_names: + raise TypeError("Expected at least one snap to add, received zero!") + + if isinstance(state, str): + state = SnapState(state) + + return _wrap_snap_operations(snap_names, state, channel, classic, cohort, revision) + + +@_cache_init +def remove(snap_names: Union[str, List[str]]) -> Union[Snap, List[Snap]]: + """Remove specified snap(s) from the system. + + Args: + snap_names: the name or names of the snaps to install + + Raises: + SnapError if some snaps failed to install. + """ + snap_names = [snap_names] if isinstance(snap_names, str) else snap_names + if not snap_names: + raise TypeError("Expected at least one snap to add, received zero!") + + return _wrap_snap_operations(snap_names, SnapState.Absent, "", False) + + +@_cache_init +def ensure( + snap_names: Union[str, List[str]], + state: str, + channel: Optional[str] = "", + classic: Optional[bool] = False, + cohort: Optional[str] = "", + revision: Optional[int] = None, +) -> Union[Snap, List[Snap]]: + """Ensure specified snaps are in a given state on the system. + + Args: + snap_names: the name(s) of the snaps to operate on + state: a string representation of the desired state, from `SnapState` + channel: an (Optional) channel as a string. Defaults to 'latest' + classic: an (Optional) boolean specifying whether it should be added with classic + confinement. Default `False` + cohort: an (Optional) string specifying the snap cohort to use + revision: an (Optional) integer specifying the snap revision to use + + When both channel and revision are specified, the underlying snap install/refresh + command will determine the precedence (revision at the time of adding this) + + Raises: + SnapError if the snap is not in the cache. + """ + if not revision and not channel: + channel = "latest" + + if state in ("present", "latest") or revision: + return add(snap_names, SnapState(state), channel, classic, cohort, revision) + else: + return remove(snap_names) + + +def _wrap_snap_operations( + snap_names: List[str], + state: SnapState, + channel: str, + classic: bool, + cohort: Optional[str] = "", + revision: Optional[str] = None, +) -> Union[Snap, List[Snap]]: + """Wrap common operations for bare commands.""" + snaps = {"success": [], "failed": []} + + op = "remove" if state is SnapState.Absent else "install or refresh" + + for s in snap_names: + try: + snap = _Cache[s] + if state is SnapState.Absent: + snap.ensure(state=SnapState.Absent) + else: + snap.ensure( + state=state, classic=classic, channel=channel, cohort=cohort, revision=revision + ) + snaps["success"].append(snap) + except SnapError as e: + logger.warning("Failed to {} snap {}: {}!".format(op, s, e.message)) + snaps["failed"].append(s) + except SnapNotFoundError: + logger.warning("Snap '{}' not found in cache!".format(s)) + snaps["failed"].append(s) + + if len(snaps["failed"]): + raise SnapError( + "Failed to install or refresh snap(s): {}".format(", ".join(list(snaps["failed"]))) + ) + + return snaps["success"] if len(snaps["success"]) > 1 else snaps["success"][0] + + +def install_local( + filename: str, classic: Optional[bool] = False, dangerous: Optional[bool] = False +) -> Snap: + """Perform a snap operation. + + Args: + filename: the path to a local .snap file to install + classic: whether to use classic confinement + dangerous: whether --dangerous should be passed to install snaps without a signature + + Raises: + SnapError if there is a problem encountered + """ + args = [ + "snap", + "install", + filename, + ] + if classic: + args.append("--classic") + if dangerous: + args.append("--dangerous") + try: + result = subprocess.check_output(args, universal_newlines=True).splitlines()[-1] + snap_name, _ = result.split(" ", 1) + snap_name = ansi_filter.sub("", snap_name) + + c = SnapCache() + + try: + return c[snap_name] + except SnapAPIError as e: + logger.error( + "Could not find snap {} when querying Snapd socket: {}".format(snap_name, e.body) + ) + raise SnapError("Failed to find snap {} in Snap cache".format(snap_name)) + except CalledProcessError as e: + raise SnapError("Could not install snap {}: {}".format(filename, e.output)) + + +def _system_set(config_item: str, value: str) -> None: + """Set system snapd config values. + + Args: + config_item: name of snap system setting. E.g. 'refresh.hold' + value: value to assign + """ + args = ["snap", "set", "system", "{}={}".format(config_item, value)] + try: + subprocess.check_call(args, universal_newlines=True) + except CalledProcessError: + raise SnapError("Failed setting system config '{}' to '{}'".format(config_item, value)) + + +def hold_refresh(days: int = 90, forever: bool = False) -> bool: + """Set the system-wide snap refresh hold. + + Args: + days: number of days to hold system refreshes for. Maximum 90. Set to zero to remove hold. + forever: if True, will set a hold forever. + """ + if not isinstance(forever, bool): + raise TypeError("forever must be a bool") + if not isinstance(days, int): + raise TypeError("days must be an int") + if forever: + _system_set("refresh.hold", "forever") + logger.info("Set system-wide snap refresh hold to: forever") + elif days == 0: + _system_set("refresh.hold", "") + logger.info("Removed system-wide snap refresh hold") + else: + # Currently the snap daemon can only hold for a maximum of 90 days + if not 1 <= days <= 90: + raise ValueError("days must be between 1 and 90") + # Add the number of days to current time + target_date = datetime.now(timezone.utc).astimezone() + timedelta(days=days) + # Format for the correct datetime format + hold_date = target_date.strftime("%Y-%m-%dT%H:%M:%S%z") + # Python dumps the offset in format '+0100', we need '+01:00' + hold_date = "{0}:{1}".format(hold_date[:-2], hold_date[-2:]) + # Actually set the hold date + _system_set("refresh.hold", hold_date) + logger.info("Set system-wide snap refresh hold to: %s", hold_date) diff --git a/pyproject.toml b/pyproject.toml index 1e0f419..0ba3f91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,6 @@ log_cli_level = "INFO" # Formatting tools configuration [tool.black] line-length = 99 -target-version = ["py38"] # Linting tools configuration [tool.ruff] diff --git a/requirements.txt b/requirements.txt index 1d1c4fc..6e317f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -ops==2.6.0 -cryptography==3.4.0 +ops==2.7.0 +cryptography==41.0.4 jinja2==3.1.2 git+https://github.com/canonical/ops-lib-nrpe.git#egg=ops-lib-nrpe diff --git a/src/charm.py b/src/charm.py index d702431..f2a5570 100755 --- a/src/charm.py +++ b/src/charm.py @@ -2,25 +2,32 @@ """Charmed Machine Operator for the NATS.""" +from __future__ import annotations + import hashlib import logging import random import socket import string -import subprocess from pathlib import Path -from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.serialization import load_pem_private_key -from cryptography.x509 import load_pem_x509_certificate -from interfaces import CAClient, NatsClient, NatsCluster -from jinja2 import Environment, FileSystemLoader +from nats_config import NATS from nrpe.client import NRPEClient # noqa: E402 -from ops import EventBase, EventSource, StoredState +from ops import ( + ConfigChangedEvent, + EventBase, + EventSource, + InstallEvent, + RelationJoinedEvent, + StoredState, +) from ops.charm import CharmBase, CharmEvents from ops.main import main -from ops.model import ActiveStatus, BlockedStatus, ModelError, WaitingStatus +from ops.model import ActiveStatus, BlockedStatus, ModelError +from relations.caclient_requirer import CAClientRequires +from relations.cluster_peers import NatsCluster +from relations.natsclient_provider import NATSClientProvider logger = logging.getLogger(__name__) @@ -39,6 +46,13 @@ class NatsCharmEvents(CharmEvents): SNAP_COMMON_PATH = Path("/var/snap/nats/common") SERVER_PATH = SNAP_COMMON_PATH / "server" +SNAP_NAME = "nats" +NATS_SERVER_CONFIG_PATH = SERVER_PATH / "nats.cfg" +AUTH_TOKEN_PATH = SERVER_PATH / "auth_secret" +AUTH_TOKEN_LENGTH = 64 +TLS_KEY_PATH = SERVER_PATH / "key.pem" +TLS_CERT_PATH = SERVER_PATH / "cert.pem" +TLS_CA_CERT_PATH = SERVER_PATH / "ca.pem" class NatsCharm(CharmBase): @@ -47,48 +61,40 @@ class NatsCharm(CharmBase): on: CharmEvents = NatsCharmEvents() state = StoredState() - NATS_SERVICE = "snap.nats.server.service" - NATS_SERVER_CONFIG_PATH = SERVER_PATH / "nats.cfg" - AUTH_TOKEN_PATH = SERVER_PATH / "auth_secret" - AUTH_TOKEN_LENGTH = 64 - TLS_KEY_PATH = SERVER_PATH / "key.pem" - TLS_CERT_PATH = SERVER_PATH / "cert.pem" - TLS_CA_CERT_PATH = SERVER_PATH / "ca.pem" - def __init__(self, *args): super().__init__(*args) + self._snap = NATS() + self.state.set_default( - is_started=False, - auth_token=NatsCharm.get_auth_token(self.AUTH_TOKEN_LENGTH), - use_tls=None, - use_tls_ca=None, - nats_config_hash=None, - client_port=None, + auth_token=NatsCharm.get_auth_token(AUTH_TOKEN_LENGTH), ) self.framework.observe(self.on.install, self._on_install) self.framework.observe(self.on.start, self._on_start) - self.framework.observe(self.on.upgrade_charm, self._on_upgrade_charm) + self.framework.observe(self.on.upgrade_charm, self._on_config_changed) self.framework.observe(self.on.config_changed, self._on_config_changed) listen_on_all_addresses = self.model.config["listen-on-all-addresses"] self.cluster = NatsCluster(self, "cluster", listen_on_all_addresses) - self.framework.observe(self.on.cluster_relation_changed, self._on_cluster_relation_changed) + self.framework.observe(self.on.cluster_relation_changed, self._on_config_changed) - self.client = NatsClient( + self.nats_client = NATSClientProvider( self, "client", listen_on_all_addresses, self.model.config["client-port"] ) self.framework.observe(self.on.client_relation_joined, self._on_client_relation_joined) - self.ca_client = CAClient(self, "ca-client") + self.ca_client = CAClientRequires(self, "ca-client") self.framework.observe(self.ca_client.on.tls_config_ready, self._on_tls_config_ready) self.framework.observe(self.ca_client.on.ca_available, self._on_ca_available) self.nrpe_client = NRPEClient(self, "nrpe-external-master") - self.framework.observe(self.nrpe_client.on.nrpe_available, self._on_nrpe_available) + self.framework.observe(self.nrpe_client.on.nrpe_available, self._on_nrpe_ready) - def _on_install(self, _): + def _on_client_relation_joined(self, event: RelationJoinedEvent): + self.nats_client.expose_nats(auth_token=self.state.auth_token) + + def _on_install(self, event: InstallEvent): try: core_res = self.model.resources.fetch("core") except ModelError: @@ -97,74 +103,56 @@ def _on_install(self, _): nats_res = self.model.resources.fetch("nats") except ModelError: nats_res = None + channel = self.model.config["snap-channel"] + self._snap.install(channel=channel, core_res=core_res, nats_res=nats_res) + + def _on_config_changed(self, event: ConfigChangedEvent): + self._reconfigure_nats(self.model.config) - cmd = ["snap", "install"] - # Install the snaps from a resource if provided. Alternatively, snapd - # will attempt to download it automatically. - if core_res is not None and Path(core_res).stat().st_size: - subprocess.check_call(cmd + ["--dangerous", core_res]) - nats_cmd = cmd - if nats_res is not None and Path(nats_res).stat().st_size: - nats_cmd += ["--dangerous", nats_res] - else: - channel = self.model.config["snap-channel"] - nats_cmd += ["nats", "--channel", channel] - subprocess.check_call(nats_cmd) - subprocess.check_call(["snap", "stop", "nats", "--disable"]) - SERVER_PATH.mkdir(exist_ok=True, mode=0o0700) - - def handle_tls_config(self): - """Handle TLS parameters passed via charm config. - - Values are loaded and parsed to provide basic validation and then used to - determine whether to use TLS in a charm by or not. If TLS is to be used, - the TLS config content is written to the necessary files. - """ - tls_key = self.model.config["tls-key"] - if tls_key: - load_pem_private_key(tls_key.encode("utf-8"), password=None, backend=default_backend()) - tls_cert = self.model.config["tls-cert"] - if tls_cert: - load_pem_x509_certificate(tls_cert.encode("utf-8"), backend=default_backend()) - tls_ca_cert = self.model.config["tls-ca-cert"] - if tls_ca_cert: - load_pem_x509_certificate(tls_ca_cert.encode("utf-8"), backend=default_backend()) - - self.state.use_tls = tls_key and tls_cert - self.state.use_tls_ca = bool(tls_ca_cert) - - # Block if one of the values is specified but not the other. - if bool(tls_key) ^ bool(tls_cert): - self.status = BlockedStatus("both TLS key and TLS cert must be specified") - if self.state.use_tls: - self.TLS_KEY_PATH.write_text(tls_key) - self.TLS_CERT_PATH.write_text(tls_cert) - # A CA cert is optional because NATS may rely on system-trusted (core snap) CA certs. - if self.state.use_tls_ca: - self.TLS_CA_CERT_PATH.write_text(tls_ca_cert) - self.client._set_tls_ca(tls_ca_cert) - - def _on_nrpe_available(self, _): - self._reconfigure_nats() - - def _on_ca_available(self, _): - self._reconfigure_nats() - - def _on_tls_config_ready(self, _): - self.TLS_KEY_PATH.write_bytes( - self.ca_client.key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), + def _on_tls_config_ready(self, event): + if self.config["tls-key"] and self.config["tls-cert"]: + logger.info( + "Not reconfiguring NATS with CA as model configuration \ + already has certificates" ) + return + current_config = dict(self.model.config) + logger.info("Configuring CA certificates from relation") + key = self.ca_client.key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ).decode("utf-8") + cert = self.ca_client.certificate.public_bytes(encoding=serialization.Encoding.PEM).decode( + "utf-8" ) - self.TLS_CERT_PATH.write_bytes( - self.ca_client.certificate.public_bytes(encoding=serialization.Encoding.PEM) - ) - self.TLS_CA_CERT_PATH.write_bytes( - self.ca_client.ca_certificate.public_bytes(encoding=serialization.Encoding.PEM) - ) - self._reconfigure_nats() + ca_cert = self.ca_client.ca_certificate.public_bytes( + encoding=serialization.Encoding.PEM + ).decode("utf-8") + # tls with CA's config + current_config["tls-cert"] = cert + current_config["tls-key"] = key + current_config["tls-ca-cert"] = ca_cert + self._reconfigure_nats(current_config) + + def _on_ca_available(self, event): + cn, sans = self._generate_cn_and_san() + self.ca_client.request_server_certificate(cn, list(sans)) + + def _on_nrpe_ready(self, event): + if self.nrpe_client.is_available: + check_name = "check_{}".format(self.model.unit.name.replace("/", "_")) + self.nrpe_client.add_check( + command=[ + "/usr/lib/nagios/plugins/check_tcp", + "-H", + str(self.nats_client.listen_address), + "-p", + str(self.model.config["client-port"]), + ], + name=check_name, + ) + self.nrpe_client.commit() def _generate_content_hash(self, content): m = hashlib.sha256() @@ -172,122 +160,68 @@ def _generate_content_hash(self, content): return m.hexdigest() # FIXME: reduce this function's complexity to satisfy the linter - def _reconfigure_nats(self): # noqa: C901 + def _reconfigure_nats(self, config): logger.info("Reconfiguring NATS") - self.handle_tls_config() ctxt = { - "client_port": self.model.config["client-port"], - "cluster_port": self.model.config["cluster-port"], + "client_port": config["client-port"], + "cluster_port": config["cluster-port"], "cluster_listen_address": self.cluster.listen_address, - "client_listen_address": self.client.listen_address, + "client_listen_address": self.nats_client.listen_address, "auth_token": self.state.auth_token, "peer_addresses": self.cluster.peer_addresses, - "debug": self.model.config["debug"], - "trace": self.model.config["trace"], + "debug": config["debug"], + "trace": config["trace"], + "tls_cert": config["tls-cert"], + "tls_key": config["tls-key"], + "tls_ca_cert": config["tls-ca-cert"], + "verify_tls_clients": config["verify-tls-clients"], + "map_tls_clients": config["map-tls-clients"], } - # Config is used in priority to using a relation to a CA charm. - if self.state.use_tls: - ctxt.update( - { - "use_tls": True, - "tls_key_path": self.TLS_KEY_PATH, - "tls_cert_path": self.TLS_CERT_PATH, - "verify_tls_clients": self.model.config["verify-tls-clients"], - "map_tls_clients": self.model.config["map-tls-clients"], - } - ) - if self.state.use_tls_ca: - ctxt["tls_ca_cert_path"] = self.TLS_CA_CERT_PATH - elif self.ca_client.is_joined: - if not self.ca_client.is_ready: - # TODO: move SAN generation into a separate function - # Use a reverse resolution for bind-address of a cluster endpoint as a heuristic to - # determine a common name. - common_name = socket.getnameinfo( - (str(self.cluster.listen_address), 0), socket.NI_NAMEREQD - )[0] - san_addresses = set() - san_addresses.add(str(self.cluster.listen_address)) - san_addresses.add(str(self.cluster.ingress_address)) - san_addresses.add(str(self.client.listen_address)) - for addr in self.client.ingress_addresses: - san_addresses.add(str(addr)) - if self.model.config["listen-on-all-addresses"]: - raise RuntimeError( - "Generating certificates with listen-on-all-addresses option is not supported yet" - ) - # TODO: update with all host interface addresses to implement this for listen-on-all-addresses. - san_hostnames = set() - for addr in san_addresses: - # May raise gaierror. - name = socket.getnameinfo((str(addr), 0), socket.NI_NAMEREQD)[0] - san_hostnames.add(name) - sans = san_addresses.union(san_hostnames) - self.ca_client.request_server_certificate(common_name, list(sans)) - self.model.unit.status = WaitingStatus( - "Waiting for TLS configuration data from the CA client." - ) - return - ctxt.update( - { - "use_tls": True, - "tls_key_path": self.TLS_KEY_PATH, - "tls_cert_path": self.TLS_CERT_PATH, - "verify_tls_clients": self.model.config["verify-tls-clients"], - "map_tls_clients": self.model.config["map-tls-clients"], - "tls_ca_cert_path": self.TLS_CA_CERT_PATH, - } - ) - self.client._set_tls_ca( - self.ca_client.ca_certificate.public_bytes( - encoding=serialization.Encoding.PEM - ).decode("utf-8") - ) + self.framework.breakpoint() + if ctxt["tls_ca_cert"]: + self.nats_client.set_tls_ca(ctxt["tls_ca_cert"]) - if self.nrpe_client.is_available: - check_name = "check_{}".format(self.model.unit.name.replace("/", "_")) - self.nrpe_client.add_check( - command=[ - "/usr/lib/nagios/plugins/check_tcp", - "-H", - str(self.client.listen_address), - "-p", - str(self.model.config["client-port"]), - ], - name=check_name, - ) - self.nrpe_client.commit() + changed = self._snap.configure(ctxt) + if changed: + self.nats_client.expose_nats(auth_token=self.state.auth_token) - tenv = Environment(loader=FileSystemLoader("templates")) - template = tenv.get_template("nats.cfg.j2") - rendered_content = template.render(ctxt) - content_hash = self._generate_content_hash(rendered_content) - old_hash = self.state.nats_config_hash - if old_hash != content_hash: - logging.info( - f"Config has changed - re-rendering a template to {self.NATS_SERVER_CONFIG_PATH}" - ) - self.state.nats_config_hash = content_hash - self.NATS_SERVER_CONFIG_PATH.write_text(rendered_content) - if self.state.is_started: - subprocess.check_call(["systemctl", "restart", self.NATS_SERVICE]) - self.client.expose_nats(auth_token=self.state.auth_token) - - client_port = self.model.config["client-port"] - if (client_port is None or client_port == 0) and ( - self.state.client_port is not None or len(self.state.client_port) > 0 - ): - self._close_port(self.state.client_port) - else: - port = "{}/tcp".format(client_port) - if self.state.client_port is not None and port != self.state.client_port: - self._close_port(self.state.client_port) - self._open_port(port) - self.state.client_port = port + client_port = int(self.model.config["client-port"]) + self.unit.set_ports(client_port) + logger.info(f"Opened port: {client_port} for access") + + if not self._snap.running: + self.unit.status = BlockedStatus("failed to configure nats") + return + logger.info("NATS configuration complete") self.unit.status = ActiveStatus() + def _generate_cn_and_san(self) -> tuple[str, set[str]]: + # Use a reverse resolution for bind-address of a cluster endpoint as a heuristic to + # determine a common name. + common_name = socket.getnameinfo( + (str(self.cluster.listen_address), 0), socket.NI_NAMEREQD + )[0] + san_addresses = set() + san_addresses.add(str(self.cluster.listen_address)) + san_addresses.add(str(self.cluster.ingress_address)) + san_addresses.add(str(self.nats_client.listen_address)) + for addr in self.nats_client.ingress_addresses: + san_addresses.add(str(addr)) + if self.model.config["listen-on-all-addresses"]: + raise RuntimeError( + "Generating certificates with listen-on-all-addresses option is not supported yet" + ) + # TODO: update with all host interface addresses to implement this for listen-on-all-addresses. + san_hostnames = set() + for addr in san_addresses: + # May raise gaierror. + name = socket.getnameinfo((str(addr), 0), socket.NI_NAMEREQD)[0] + san_hostnames.add(name) + sans = san_addresses.union(san_hostnames) + return common_name, sans + @classmethod def get_auth_token(cls, length=None): """Generate a random auth token.""" @@ -298,29 +232,11 @@ def get_auth_token(cls, length=None): return "".join([rng.choice(alphanumeric_chars) for _ in range(length)]) def _on_start(self, _): - subprocess.check_call(["snap", "start", "nats", "--enable"]) - self.state.is_started = True - self.on.nats_started.emit() + self._snap.start() + if self._snap.running: + self.on.nats_started.emit() self.model.unit.status = ActiveStatus() - def _on_cluster_relation_changed(self, _): - self._reconfigure_nats() - - def _on_client_relation_joined(self, _): - self._reconfigure_nats() - - def _on_config_changed(self, _): - self._reconfigure_nats() - - def _on_upgrade_charm(self, _): - self._reconfigure_nats() - - def _open_port(self, port): - subprocess.check_call(["open-port", port]) - - def _close_port(self, port): - subprocess.check_call(["close-port", port]) - if __name__ == "__main__": main(NatsCharm) diff --git a/src/interfaces.py b/src/interfaces.py deleted file mode 100644 index 8796926..0000000 --- a/src/interfaces.py +++ /dev/null @@ -1,248 +0,0 @@ -"""List of defined interfaces.""" -import ipaddress -import json -import logging - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.serialization import load_pem_private_key -from cryptography.x509 import load_pem_x509_certificate -from ops.framework import EventBase, EventSource, Object, ObjectEvents, StoredState - -logger = logging.getLogger(__name__) - - -class NatsCluster(Object): - """Peer relation `nats-cluster` interface for the NATS charm.""" - - def __init__(self, charm, relation_name, listen_on_all_addresses): - super().__init__(charm, relation_name) - self._relation_name = relation_name - if listen_on_all_addresses: - # This will create a listening socket for all IPv4 and IPv6 addresses. - self._listen_address = ipaddress.ip_address("0.0.0.0") - self._ingress_address = ipaddress.ip_address("0.0.0.0") - else: - self._listen_address = None - self._ingress_address = None - - @property - def is_joined(self): - """Property to know if the relation has been joined.""" - return self.framework.model.get_relation(self._relation_name) is not None - - @property - def relation(self): - """Property to get the name of the relation.""" - return self.framework.model.get_relation(self._relation_name) - - @property - def peer_addresses(self): - """Property to get addresses of the units in the cluster.""" - addresses = [] - relation = self.relation - if relation: - for u in relation.units: - addresses.append(relation.data[u]["ingress-address"]) - return addresses - - @property - def listen_address(self): - """Property to get the listen address.""" - if self._listen_address is None: - self._listen_address = self.model.get_binding(self._relation_name).network.bind_address - return self._listen_address - - @property - def ingress_address(self): - """Property to get the ingress address.""" - if self._ingress_address is None: - self._ingress_address = self.model.get_binding( - self._relation_name - ).network.ingress_address - return self._ingress_address - - -class NatsClient(Object): - """`nats` interface for the client to NATS.""" - - state = StoredState() - - def __init__(self, charm, relation_name, listen_on_all_addresses, client_port): - super().__init__(charm, relation_name) - self._relation_name = relation_name - self._client_port = client_port - self._listen_on_all_addresses = listen_on_all_addresses - if listen_on_all_addresses: - # This will create a listening socket for all IPv4 and IPv6 addresses. - self._listen_address = ipaddress.ip_address("0.0.0.0") - else: - self._listen_address = None - self._ingress_addresses = None - self.state.set_default(tls_ca=None) - - @property - def listen_address(self): - """Property to get the listen address.""" - if self._listen_address is None: - addresses = set() - for relation in self.model.relations[self._relation_name]: - address = self.model.get_binding(relation).network.bind_address - addresses.add(address) - if len(addresses) > 1: - raise Exception( - "Multiple potential listen addresses detected: NATS does not support that" - ) - elif addresses == 1: - self._listen_address = addresses.pop() - else: - # Default to network information associated with an endpoint binding itself in absence of relations. - self._listen_address = self.model.get_binding( - self._relation_name - ).network.bind_address - return self._listen_address - - def _set_tls_ca(self, tls_ca): - self.state.tls_ca = tls_ca - - def expose_nats(self, auth_token=None): - """Exposes NATS to the outside world by publishing cert and url to relation data.""" - relations = self.model.relations[self._relation_name] - for rel in relations: - token_field = "" - if auth_token is not None: - token_field = f"{auth_token}@" - if self.state.tls_ca is not None: - url = f"tls://{token_field}{self.listen_address}:{self._client_port}" - else: - url = f"nats://{token_field}{self.listen_address}:{self._client_port}" - rel.data[self.model.unit]["url"] = url - if self.model.unit.is_leader() and self.state.tls_ca is not None: - rel.data[self.model.app]["ca_cert"] = self.state.tls_ca - - @property - def ingress_addresses(self): - """Property to get the ingress address.""" - # Even though NATS does not support multiple listening addresses that does not mean there - # cannot be multiple ingress addresses clients would use. - if self._ingress_addresses is None: - self._ingress_addresses = set() - for relation in self.model.relations[self._relation_name]: - self._ingress_addresses.add( - self.model.get_binding(relation).network.ingress_address - ) - return self._ingress_addresses - - -class CAAvailable(EventBase): - """Event for knowing if the CA is available.""" - - pass - - -class TlsConfigReady(EventBase): - """Event for configuring if the `tls-certificates` relation is ready.""" - - pass - - -class CAClientEvents(ObjectEvents): - """Event emitter for the NATS charm.""" - - ca_available = EventSource(CAAvailable) - tls_config_ready = EventSource(TlsConfigReady) - - -class CAClient(Object): - """Implement for the requires side in `tls-ceritificates` relation.""" - - on: ObjectEvents = CAClientEvents() - state = StoredState() - - def __init__(self, charm, relation_name): - """Initialise relation for `tls-certificates`. - - Charm -- a NatsCharm instance. - relation_name -- a name of the relation with the tls-certificates interface for this charm. - common_name -- a name to place into the CN field of a certificate. - sans -- Subject Alternative Names (per RFC 5280): names or IPs to include in a requested certificate. - """ - super().__init__(charm, relation_name) - self._relation_name = relation_name - self._common_name = None - self._sans = None - - self.state.set_default(ca_certificate=None, key=None, certificate=None) - - self.framework.observe(charm.on[relation_name].relation_joined, self._on_relation_joined) - self.framework.observe(charm.on[relation_name].relation_changed, self._on_relation_changed) - - @property - def is_joined(self): - """Property to know if the relation has been joined.""" - return self.framework.model.get_relation(self._relation_name) is not None - - @property - def is_ready(self): - """Property to know if the relation is ready.""" - return all( - p is not None - for p in (self.state.certificate, self.state.key, self.state.ca_certificate) - ) - - @property - def certificate(self): - """Property to get the configured certificate.""" - return load_pem_x509_certificate( - self.state.certificate.encode("utf-8"), backend=default_backend() - ) - - @property - def key(self): - """Property to get the configured private key.""" - return load_pem_private_key( - self.state.key.encode("utf-8"), password=None, backend=default_backend() - ) - - @property - def ca_certificate(self): - """Property to get the configured CA certificate.""" - return load_pem_x509_certificate( - self.state.ca_certificate.encode("utf-8"), backend=default_backend() - ) - - def _on_relation_joined(self, event): - self.on.ca_available.emit() - - def request_server_certificate(self, common_name, sans): - """Request a new server certificate. - - If arguments do not change from a previous request, then a new certificate will not - be requested. This method can be useful if a list of SANS has changed during the - lifetime of a charm. - - common_name -- a new common name to use. - sans -- an updated list of Subject Alternative Names to use. - """ - rel = self.framework.model.get_relation(self._relation_name) - logger.info(f"Requesting a CA certificate. Common name: {common_name}, SANS: {sans}") - rel_data = rel.data[self.model.unit] - rel_data["common_name"] = common_name - rel_data["sans"] = json.dumps(sans) - - def _on_relation_changed(self, event): - # easy-rsa is not HA so there is only one unit to work with and Vault uses one leader unit to - # write responses and does not (at the time of writing) rely on app relation data. - remote_data = event.relation.data[event.unit] - - cert = remote_data.get(f'{self.model.unit.name.replace("/", "_")}.server.cert') - key = remote_data.get(f'{self.model.unit.name.replace("/", "_")}.server.key') - ca = remote_data.get("ca") - if cert is None or key is None or ca is None: - logger.info( - "A CA has not yet exposed a requested certificate, key and CA certificate." - ) - return - self.state.certificate = cert - self.state.key = key - self.state.ca_certificate = ca - self.on.tls_config_ready.emit() diff --git a/src/nats_config.py b/src/nats_config.py new file mode 100644 index 0000000..abaedb2 --- /dev/null +++ b/src/nats_config.py @@ -0,0 +1,227 @@ +# See LICENSE file for licensing details. + +"""Control NATS on a host system. Provides a NATS class.""" + +from __future__ import annotations + +import hashlib +import logging +from dataclasses import asdict, dataclass, field +from pathlib import Path + +from charms.operator_libs_linux.v2 import snap +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.serialization import load_pem_private_key +from cryptography.x509 import load_pem_x509_certificate +from jinja2 import Environment, FileSystemLoader + +logger = logging.getLogger(__name__) + +SNAP_NAME = "nats" + + +@dataclass +class NATSConfig: + """Class representing configuration data for NATS.""" + + client_port: int = -1 + cluster_port: int = -1 + cluster_listen_address: str = "" + client_listen_address: str = "" + peer_addresses: list[str] = field(default_factory=list) + debug: bool = False + trace: bool = False + use_tls: bool = False + tls_key_path: Path | None = None + tls_cert_path: Path | None = None + tls_ca_cert_path: Path | None = None + verify_tls_clients: bool = False + map_tls_clients: bool = False + auth_token: str = field(default="", repr=False) + + +class NATS: + """Class representing NATS on a host system.""" + + AUTH_TOKEN_LENGTH = 64 + SNAP_NAME = "nats" + + SNAP_COMMON_PATH = Path("/var/snap/nats/common") + SERVER_PATH = SNAP_COMMON_PATH / "server" + + CONFIG_PATH = SERVER_PATH / "nats.cfg" + AUTH_TOKEN_PATH = SERVER_PATH / "auth_secret" + TLS_KEY_PATH = SERVER_PATH / "key.pem" + TLS_CERT_PATH = SERVER_PATH / "cert.pem" + TLS_CA_CERT_PATH = SERVER_PATH / "ca.pem" + + def __init__(self, *args): + super().__init__(*args) + self._current_config = self._current_config_hash() + + def _generate_content_hash(self, content: str): + m = hashlib.sha256() + m.update(content.encode("utf-8")) + return m.hexdigest() + + def _current_config_hash(self) -> str: + config = None + try: + with open(self.CONFIG_PATH, "rb") as f: + config = f.read() + except FileNotFoundError: + logger.debug("Current configuration not found, no previous hash generated") + return "" + return self._generate_content_hash(config.decode("utf-8")) + + def install( + self, + channel: str | None = None, + core_res: Path | None = None, + nats_res: Path | None = None, + ): + """Install the NATS snap package.""" + # Install the snaps from a resource if provided. Alternatively, snapd + # will attempt to download it automatically. + if core_res and core_res.stat().st_size: + snap.install_local(core_res, dangerous=True) + + if nats_res and nats_res.stat().st_size: + snp = snap.install_local(nats_res, dangerous=True) + else: + snp = self._snap + + if not snp.present: + snp.ensure(snap.SnapState.Latest, channel=channel) + snp.stop(disable=True) + + self.SERVER_PATH.mkdir(exist_ok=True, mode=0o0700) + + try: + self._snap.ensure(snap.SnapState.Latest, channel=channel) + except snap.SnapError as e: + logger.error("could not install nats. Reason: %s", e.message) + logger.debug(e, exc_info=True) + raise e + + def refresh(self, channel: str): + """Refresh the NATS snap if there is a new revision.""" + # The operation here is exactly the same, so just call the install method + self.install(channel) + + def start(self): + """Start and enable NATS using the snap service.""" + self._snap.start(enable=True) + + def restart(self): + """Start and enable NATS using the snap service.""" + self._snap.restart() + + def stop(self): + """Stop NATS using the snap service.""" + self._snap.stop(disable=True) + + def remove(self): + """Remove the NATS snap, preserving config and data.""" + self._snap.ensure(snap.SnapState.Absent) + + def _setup_tls(self, tls_cert: str, tls_key: str, ca_cert: str | None = None) -> bool: + """Handle TLS parameters passed via charm config. + + Values are loaded and parsed to provide basic validation and then used to + determine whether to use TLS in a charm by or not. If TLS is to be used, + the TLS config content is written to the necessary files. + """ + only_one_is_set = bool(tls_cert) ^ bool(tls_key) + if only_one_is_set: + raise Exception("both TLS key and TLS cert must be specified") + use_tls = all([bool(tls_cert), bool(tls_key)]) + + if not use_tls: + return False + + if tls_key: + load_pem_private_key(tls_key.encode("utf-8"), password=None, backend=default_backend()) + if tls_cert: + load_pem_x509_certificate(tls_cert.encode("utf-8"), backend=default_backend()) + if ca_cert: + load_pem_x509_certificate(ca_cert.encode("utf-8"), backend=default_backend()) + + self.TLS_KEY_PATH.write_text(tls_key) + self.TLS_CERT_PATH.write_text(tls_cert) + # A CA cert is optional because NATS may rely on system-trusted (core snap) CA certs. + if ca_cert: + self.TLS_CA_CERT_PATH.write_text(ca_cert) + logger.debug("Created ca cert for nats") + return True + + def configure(self, config: dict, restart: bool = True) -> bool: + """Configure NATS on the host system. Restart NATS by default.""" + config_changed = False + use_tls = self._setup_tls( + tls_key=config["tls_key"], + tls_cert=config["tls_cert"], + ca_cert=config.get("tls_ca_cert"), + ) + + cfg = NATSConfig( + use_tls=use_tls, + tls_cert_path=self.TLS_CERT_PATH, + tls_key_path=self.TLS_KEY_PATH, + tls_ca_cert_path=self.TLS_CA_CERT_PATH, + map_tls_clients=config.get("map_tls_clients", False), + verify_tls_clients=config.get("verify_tls_clients", False), + auth_token=config["auth_token"], + debug=config["debug"], + trace=config["trace"], + client_listen_address=config["client_listen_address"], + client_port=config["client_port"], + cluster_listen_address=config["cluster_listen_address"], + cluster_port=config["cluster_port"], + peer_addresses=config["peer_addresses"], + ) + new_config = self._generate_config(cfg) + config_hash = self._generate_content_hash(new_config) + + if self._current_config != config_hash: + logger.info(f"Config has changed - re-rendering a template to {self.CONFIG_PATH}") + self.CONFIG_PATH.write_text(new_config) + self._current_config = config_hash + config_changed = True + + # Restart the snap service only if it was running already + if restart: + self._snap.restart() + return config_changed or use_tls + + def _generate_config(self, config: NATSConfig) -> str: + tenv = Environment(loader=FileSystemLoader("templates")) + template = tenv.get_template("nats.cfg.j2") + return template.render(asdict(config)) + + @property + def installed(self): + """Report if the NATS snap is installed.""" + return self._snap.present + + @property + def running(self): + """Report if the 'server' snap service is running.""" + return self._snap.services["server"]["active"] + + @property + def version(self) -> str: + """Report the version of NATS currently installed.""" + if self.installed: + snaps = self._snap._snap_client.get_installed_snaps() + for installed_snap in snaps: + if installed_snap["name"] == self._snap.name: + return installed_snap["version"] + + raise snap.SnapError(f"{SNAP_NAME} snap not installed, cannot fetch version") + + @property + def _snap(self): + """Return a representation of the NATS snap.""" + cache = snap.SnapCache() + return cache[SNAP_NAME] diff --git a/src/relations/caclient_requirer.py b/src/relations/caclient_requirer.py new file mode 100644 index 0000000..f0ebe65 --- /dev/null +++ b/src/relations/caclient_requirer.py @@ -0,0 +1,126 @@ +"""CA Client Relation Requirer.""" + +import json +import logging + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.serialization import load_pem_private_key +from cryptography.x509 import load_pem_x509_certificate +from ops import EventBase, EventSource, Object, ObjectEvents, StoredState + +logger = logging.getLogger(__name__) + + +class CAAvailable(EventBase): + """Event for knowing if the CA is available.""" + + pass + + +class TlsConfigReady(EventBase): + """Event for configuring if the `tls-certificates` relation is ready.""" + + pass + + +class CAClientEvents(ObjectEvents): + """Event emitter for the NATS charm.""" + + ca_available = EventSource(CAAvailable) + tls_config_ready = EventSource(TlsConfigReady) + + +class CAClientRequires(Object): + """Implement for the requires side in `tls-ceritificates` relation.""" + + on: ObjectEvents = CAClientEvents() + state = StoredState() + + def __init__(self, charm, relation_name): + """Initialise relation for `tls-certificates`. + + Charm -- a NatsCharm instance. + relation_name -- a name of the relation with the tls-certificates interface for this charm. + common_name -- a name to place into the CN field of a certificate. + sans -- Subject Alternative Names (per RFC 5280): names or IPs to include in a requested certificate. + """ + super().__init__(charm, relation_name) + self._relation_name = relation_name + self._common_name = None + self._sans = None + + self.state.set_default(ca_certificate=None, key=None, certificate=None) + + self.framework.observe(charm.on[relation_name].relation_joined, self._on_relation_joined) + self.framework.observe(charm.on[relation_name].relation_changed, self._on_relation_changed) + + @property + def is_joined(self): + """Property to know if the relation has been joined.""" + return self.framework.model.get_relation(self._relation_name) is not None + + @property + def is_ready(self): + """Property to know if the relation is ready.""" + return all( + p is not None + for p in (self.state.certificate, self.state.key, self.state.ca_certificate) + ) + + @property + def certificate(self): + """Property to get the configured certificate.""" + return load_pem_x509_certificate( + self.state.certificate.encode("utf-8"), backend=default_backend() + ) + + @property + def key(self): + """Property to get the configured private key.""" + return load_pem_private_key( + self.state.key.encode("utf-8"), password=None, backend=default_backend() + ) + + @property + def ca_certificate(self): + """Property to get the configured CA certificate.""" + return load_pem_x509_certificate( + self.state.ca_certificate.encode("utf-8"), backend=default_backend() + ) + + def _on_relation_joined(self, event): + self.on.ca_available.emit() + + def request_server_certificate(self, common_name, sans): + """Request a new server certificate. + + If arguments do not change from a previous request, then a new certificate will not + be requested. This method can be useful if a list of SANS has changed during the + lifetime of a charm. + + common_name -- a new common name to use. + sans -- an updated list of Subject Alternative Names to use. + """ + rel = self.framework.model.get_relation(self._relation_name) + logger.info(f"Requesting a CA certificate. Common name: {common_name}, SANS: {sans}") + rel_data = rel.data[self.model.unit] + rel_data["common_name"] = common_name + rel_data["sans"] = json.dumps(sans) + + def _on_relation_changed(self, event): + # easy-rsa is not HA so there is only one unit to work with and Vault uses one leader unit to + # write responses and does not (at the time of writing) rely on app relation data. + remote_data = event.relation.data[event.unit] + + cert = remote_data.get(f'{self.model.unit.name.replace("/", "_")}.server.cert') + key = remote_data.get(f'{self.model.unit.name.replace("/", "_")}.server.key') + ca = remote_data.get("ca") + if cert is None or key is None or ca is None: + logger.info( + "A CA has not yet exposed a requested certificate, key and CA certificate." + ) + return + self.state.certificate = cert + self.state.key = key + self.state.ca_certificate = ca + self.on.tls_config_ready.emit() diff --git a/src/relations/cluster_peers.py b/src/relations/cluster_peers.py new file mode 100644 index 0000000..431ece4 --- /dev/null +++ b/src/relations/cluster_peers.py @@ -0,0 +1,45 @@ +"""NATS Cluster Endpoint handler.""" +import ipaddress + +from ops.framework import Object + + +class NatsCluster(Object): + """Peer relation `nats-cluster` interface for the NATS charm.""" + + def __init__(self, charm, relation_name, listen_on_all_addresses): + super().__init__(charm, relation_name) + self._relation_name = relation_name + if listen_on_all_addresses: + # This will create a listening socket for all IPv4 and IPv6 addresses. + self._listen_address = ipaddress.ip_address("0.0.0.0") + self._ingress_address = ipaddress.ip_address("0.0.0.0") + else: + self._listen_address = None + self._ingress_address = None + + @property + def peer_addresses(self): + """Property to get addresses of the units in the cluster.""" + addresses = [] + relation = self.framework.model.get_relation(self._relation_name) + if relation: + for u in relation.units: + addresses.append(relation.data[u]["ingress-address"]) + return addresses + + @property + def listen_address(self): + """Property to get the listen address.""" + if not self._listen_address: + self._listen_address = self.model.get_binding(self._relation_name).network.bind_address + return self._listen_address + + @property + def ingress_address(self): + """Property to get the ingress address.""" + if not self._ingress_address: + self._ingress_address = self.model.get_binding( + self._relation_name + ).network.ingress_address + return self._ingress_address diff --git a/src/relations/natsclient_provider.py b/src/relations/natsclient_provider.py new file mode 100644 index 0000000..9113b58 --- /dev/null +++ b/src/relations/natsclient_provider.py @@ -0,0 +1,96 @@ +"""NATS Client Provider for `nats` interface.""" +import ipaddress +import logging + +from ops import JujuVersion, Object, SecretNotFoundError, StoredState + +logger = logging.getLogger(__name__) + + +class NATSClientProvider(Object): + """`nats` interface for the client to NATS.""" + + state = StoredState() + + def __init__(self, charm, relation_name, listen_on_all_addresses, client_port): + super().__init__(charm, relation_name) + self._relation_name = relation_name + self._client_port = client_port + self._listen_on_all_addresses = listen_on_all_addresses + if listen_on_all_addresses: + # This will create a listening socket for all IPv4 and IPv6 addresses. + self._listen_address = ipaddress.ip_address("0.0.0.0") + else: + self._listen_address = None + self._ingress_addresses = None + self.state.set_default(tls_ca=None) + if JujuVersion.from_environ().has_secrets: + self._secret_label = "nats-protected-url" + + @property + def listen_address(self): + """Property to get the listen address.""" + if self._listen_address is None: + addresses = set() + for relation in self.model.relations[self._relation_name]: + address = self.model.get_binding(relation).network.bind_address + addresses.add(address) + if len(addresses) > 1: + raise Exception( + "Multiple potential listen addresses detected: NATS does not support that" + ) + elif addresses == 1: + self._listen_address = addresses.pop() + else: + # Default to network information associated with an endpoint binding itself in absence of relations. + self._listen_address = self.model.get_binding( + self._relation_name + ).network.bind_address + return self._listen_address + + def set_tls_ca(self, tls_ca): + """Set CA to publish this certificate to the remote unit via the Client relation.""" + self.state.tls_ca = tls_ca + + def expose_nats(self, auth_token=None): + """Exposes NATS to the outside world by publishing cert and url to relation data.""" + relations = self.model.relations[self._relation_name] + for rel in relations: + token_field = "" + protocol = "nats" + + if auth_token is not None: + token_field = f"{auth_token}@" + if self.state.tls_ca: + protocol = "tls" + + url = f"{protocol}://{token_field}{self.listen_address}:{self._client_port}" + rel.data[self.model.unit]["url"] = url + if self.model.unit.is_leader() and self.state.tls_ca is not None: + rel.data[self.model.app]["ca_cert"] = self.state.tls_ca + + # Use secrets only if juju > 3.0 + # TODO: make this the only way to share url once all charms use the + # charm-lib and juju > 3.0 + if JujuVersion.from_environ().has_secrets: + try: + secret = self.model.get_secret(label=self._secret_label) + except SecretNotFoundError: + logger.debug("Secret not found, creating a new one") + secret = self.model.unit.add_secret( + content={"url": url}, label=self._secret_label + ) + secret.grant(rel) + + @property + def ingress_addresses(self): + """Property to get the ingress address.""" + # Even though NATS does not support multiple listening addresses that does not mean there + # cannot be multiple ingress addresses clients would use. + if self._ingress_addresses is None: + self._ingress_addresses = set() + for relation in self.model.relations[self._relation_name]: + self._ingress_addresses.add( + self.model.get_binding(relation).network.ingress_address + ) + return self._ingress_addresses diff --git a/tests/integration/relation_tests/application-charm/charmcraft.yaml b/tests/integration/relation_tests/application-charm/charmcraft.yaml new file mode 100644 index 0000000..9e861b9 --- /dev/null +++ b/tests/integration/relation_tests/application-charm/charmcraft.yaml @@ -0,0 +1,19 @@ +type: charm +bases: +- build-on: + - name: "ubuntu" + channel: "20.04" + - name: "ubuntu" + channel: "22.04" + run-on: + - name: "ubuntu" + channel: "20.04" + architectures: [arm64, amd64] + - name: "ubuntu" + channel: "22.04" + architectures: [arm64, amd64] +parts: + charm: + charm-requirements: ["requirements.txt"] + build-packages: + - git diff --git a/tests/integration/relation_tests/application-charm/config.yaml b/tests/integration/relation_tests/application-charm/config.yaml new file mode 100644 index 0000000..7a7cd45 --- /dev/null +++ b/tests/integration/relation_tests/application-charm/config.yaml @@ -0,0 +1,5 @@ +options: + check_clustering: + type: boolean + default: false + description: Whether to test clustering for the NATS cluster diff --git a/tests/integration/relation_tests/application-charm/metadata.yaml b/tests/integration/relation_tests/application-charm/metadata.yaml new file mode 100644 index 0000000..8bd526e --- /dev/null +++ b/tests/integration/relation_tests/application-charm/metadata.yaml @@ -0,0 +1,11 @@ +name: nats-tester +display-name: nats-test-app +description: a testing application for nats charm +summary: A simple application to relate to the nats interface to test the nats charm +subordinate: false +series: + - focal + - jammy +requires: + client: + interface: nats diff --git a/tests/integration/relation_tests/application-charm/requirements.txt b/tests/integration/relation_tests/application-charm/requirements.txt new file mode 100644 index 0000000..9b17834 --- /dev/null +++ b/tests/integration/relation_tests/application-charm/requirements.txt @@ -0,0 +1,2 @@ +nats-py==2.4.0 +ops==2.7.0 diff --git a/tests/integration/relation_tests/application-charm/src/charm.py b/tests/integration/relation_tests/application-charm/src/charm.py new file mode 100755 index 0000000..885939b --- /dev/null +++ b/tests/integration/relation_tests/application-charm/src/charm.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 + +import asyncio +import logging +import ssl + +import nats +import ops + +logger = logging.getLogger(__name__) + + +class ApplicationCharm(ops.CharmBase): + """Application charm that connects to database charms.""" + + def __init__(self, *args): + super().__init__(*args) + + # Default charm events. + self.framework.observe(self.on.start, self._on_start) + self.framework.observe(self.on.client_relation_changed, self._on_client_relation_changed) + + def _on_start(self, _): + self.unit.status = ops.ActiveStatus() + + def _on_client_relation_changed(self, event): + unit_data = event.relation.data.get(event.unit) + if not unit_data: + logger.error("data not found in relation") + self.unit.status = ops.BlockedStatus("waiting for relation data") + return + url = unit_data.get("url") + if not url: + logger.error("url not found") + self.unit.status = ops.BlockedStatus("waiting for relation data") + return + connect_opts = {} + if url.startswith("tls"): + cert = event.relation.data.get(event.app).get("ca_cert") + if not cert: + logger.error("ca_cert not found") + self.unit.status = ops.BlockedStatus("waiting for relation data") + return + tls = ssl.create_default_context(cadata=cert) + connect_opts.update({"tls": tls}) + + async def _verify_connection(url: str, opts: dict): + client = await nats.connect(url, **opts) + logger.info(f"connected to {url}") + if self.config["check_clustering"]: + logger.info("checking for clustering") + assert len(client.servers) > 1, f"NATS not clustered: {client.servers}" + channel_name = "test" + message = b"testing" + sub = await client.subscribe(channel_name) + await client.publish(channel_name, message) + msg = await sub.next_msg() + assert ( + msg.data == message + ), f"messages do not match. Expected: {message}, Got: {msg.data}" + assert ( + msg.subject == channel_name + ), f"messages do not match. Expected: {channel_name}, Got: {msg.subject}" + logger.info("connection check complete") + await sub.unsubscribe() + + loop = asyncio.get_event_loop() + loop.run_until_complete(_verify_connection(url, connect_opts)) + self.unit.status = ops.ActiveStatus() + + +if __name__ == "__main__": + ops.main(ApplicationCharm) diff --git a/tests/integration/relation_tests/helpers.py b/tests/integration/relation_tests/helpers.py new file mode 100644 index 0000000..8d30987 --- /dev/null +++ b/tests/integration/relation_tests/helpers.py @@ -0,0 +1,156 @@ +from pathlib import Path +from typing import Literal, Optional + +import yaml +from pytest_operator.plugin import OpsTest +from tenacity import RetryError, Retrying, stop_after_attempt, wait_exponential + +APPLICATION_APP_NAME = "nats-tester" +TEST_APP_CHARM_PATH = "tests/integration/relation_tests/application-charm" +CHARM_NAME = yaml.safe_load(Path("metadata.yaml").read_text())["name"] +APP_NAMES = [APPLICATION_APP_NAME, CHARM_NAME] + + +async def get_alias_from_relation_data( + ops_test: OpsTest, unit_name: str, related_unit_name: str +) -> Optional[str]: + """Get the alias that the unit assigned to the related unit application/cluster. + + Args: + ops_test: The ops test framework instance + unit_name: The name of the unit + related_unit_name: name of the related unit + + Returns: + the alias for the application/cluster of + the related unit + + Raises: + ValueError if it's not possible to get unit data + or if there is no alias on that. + """ + raw_data = (await ops_test.juju("show-unit", related_unit_name))[1] + if not raw_data: + raise ValueError(f"no unit info could be grabbed for {related_unit_name}") + data = yaml.safe_load(raw_data) + + # Retrieve the relation data from the unit. + relation_data = {} + for relation in data[related_unit_name]["relation-info"]: + for name, unit in relation["related-units"].items(): + if name == unit_name: + relation_data = unit["data"] + break + + # Check whether the unit has set an alias for the related unit application/cluster. + if "alias" not in relation_data: + raise ValueError(f"no alias could be grabbed for {related_unit_name} application/cluster") + + return relation_data["alias"] + + +async def get_relation_data( + ops_test: OpsTest, + application_name: str, + relation_name: str, + key: str, + databag: Literal["application", "unit"], + relation_id: str | None = None, + relation_alias: str | None = None, +) -> Optional[str]: + """Get relation data for an application. + + Args: + ops_test: The ops test framework instance + application_name: The name of the application + relation_name: name of the relation to get connection data from + key: key of data to be retrieved + databag: Type of data bag i.e application or unit, to check the key in. Defaults to "unit". + relation_id: id of the relation to get connection data from + relation_alias: alias of the relation (like a connection name) + to get connection data from + + Returns: + the data that was requested or None + if no data in the relation + + Raises: + ValueError if it's not possible to get application data + or if there is no data for the particular relation endpoint + and/or alias. + """ + unit_name = ops_test.model.applications[application_name].units[0].name + raw_data = (await ops_test.juju("show-unit", unit_name))[1] + if not raw_data: + raise ValueError(f"no unit info could be grabbed for {unit_name}") + data = yaml.safe_load(raw_data) + # Filter the data based on the relation name. + relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name] + if relation_id: + # Filter the data based on the relation id. + relation_data = [v for v in relation_data if v["relation-id"] == relation_id] + if relation_alias: + # Filter the data based on the cluster/relation alias. + relation_data = [ + v + for v in relation_data + if await get_alias_from_relation_data( + ops_test, unit_name, next(iter(v["related-units"])) + ) + == relation_alias + ] + if len(relation_data) == 0: + raise ValueError( + f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}" + ) + if databag == "application": + return relation_data[0]["application-data"].get(key) + elif databag == "unit": + related_unit = relation_data[0]["related-units"].popitem() + return related_unit[1]["data"].get(key) + else: + raise ValueError("databag can only be of type 'unit' or 'application'") + + +async def check_relation_data_existence( + ops_test: OpsTest, + application_name: str, + relation_name: str, + key: str, + exists: bool = True, + databag: Literal["unit", "application"] = "unit", +) -> bool: + """Check for the existence of a key in the relation data. + + Args: + ops_test: The ops test framework instance + application_name: The name of the application + relation_name: Name of the relation to get relation data from + key: Key of data to be checked + exists: Whether to check for the existence or non-existence + databag: Type of data bag i.e application or unit, to check the key in. Defaults to "unit". + + Returns: + whether the key exists in the relation data + """ + try: + # Retry mechanism used to wait for some events to be triggered, + # like the relation departed event. + for attempt in Retrying( + stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt: + data = await get_relation_data( + ops_test, + application_name, + relation_name, + key, + databag, + ) + if exists: + assert data is not None + else: + assert data is None + return True + except RetryError: + return False diff --git a/tests/integration/relation_tests/test_clustering.py b/tests/integration/relation_tests/test_clustering.py new file mode 100644 index 0000000..c3e87e6 --- /dev/null +++ b/tests/integration/relation_tests/test_clustering.py @@ -0,0 +1,44 @@ +import asyncio + +import pytest +from helpers import ( + APP_NAMES, + APPLICATION_APP_NAME, + TEST_APP_CHARM_PATH, + check_relation_data_existence, +) +from pytest_operator.plugin import OpsTest + +from tests.integration.relation_tests.helpers import CHARM_NAME + + +@pytest.mark.skip_if_deployed +async def test_deploy_cluster(ops_test: OpsTest): + charms = await ops_test.build_charms(".", TEST_APP_CHARM_PATH) + async with ops_test.fast_forward(): + await asyncio.gather( + ops_test.model.deploy( + charms[APPLICATION_APP_NAME], + application_name=APPLICATION_APP_NAME, + config={"check_clustering": "true"}, + num_units=1, + ), + ops_test.model.deploy( + charms[CHARM_NAME], + application_name=CHARM_NAME, + num_units=3, + ), + ) + await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active", timeout=1000) + + +async def test_relation(ops_test: OpsTest): + async with ops_test.fast_forward(): + await ops_test.model.relate(f"{APPLICATION_APP_NAME}:client", f"{CHARM_NAME}:client") + await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") + await check_relation_data_existence( + ops_test, + APPLICATION_APP_NAME, + "client", + "url", + ) diff --git a/tests/integration/relation_tests/test_tls.py b/tests/integration/relation_tests/test_tls.py new file mode 100644 index 0000000..4ec2a15 --- /dev/null +++ b/tests/integration/relation_tests/test_tls.py @@ -0,0 +1,61 @@ +import asyncio + +import pytest +from helpers import APP_NAMES, APPLICATION_APP_NAME, TEST_APP_CHARM_PATH +from nrpe.client import logging +from pytest_operator.plugin import OpsTest + +from tests.integration.relation_tests.helpers import CHARM_NAME + +TLS_CA_CHARM_NAME = "easyrsa" + + +@pytest.mark.skip_if_deployed +async def test_deploy_tls(ops_test: OpsTest): + charms = await ops_test.build_charms(".", TEST_APP_CHARM_PATH) + async with ops_test.fast_forward(): + await asyncio.gather( + ops_test.model.deploy( + charms[APPLICATION_APP_NAME], + application_name=APPLICATION_APP_NAME, + num_units=1, + ), + ops_test.model.deploy( + charms[CHARM_NAME], + application_name=CHARM_NAME, + num_units=1, + ), + ops_test.model.deploy( + TLS_CA_CHARM_NAME, + application_name=TLS_CA_CHARM_NAME, + channel="stable", + num_units=1, + ), + ) + await ops_test.model.wait_for_idle( + apps=[*APP_NAMES, TLS_CA_CHARM_NAME], status="active", timeout=1000 + ) + + +async def test_tls_enabled(ops_test: OpsTest): + async with ops_test.fast_forward(): + await ops_test.model.relate(f"{TLS_CA_CHARM_NAME}:client", f"{CHARM_NAME}:ca-client") + await ops_test.model.relate(f"{APPLICATION_APP_NAME}:client", f"{CHARM_NAME}:client") + await ops_test.model.wait_for_idle(apps=[*APP_NAMES, TLS_CA_CHARM_NAME], status="active") + + +async def test_secrets(ops_test: OpsTest): + # Check that on juju 3 we have secrets + if hasattr(ops_test.model, "list_secrets"): + logging.info("checking for secrets") + secrets = await ops_test.model.list_secrets() + assert len(secrets.results) > 0, "secrets not found" + else: + pytest.skip("secrets not supported for juju < 3.0") + + +async def test_adding_unit_works(ops_test: OpsTest): + async with ops_test.fast_forward(): + nats_app = ops_test.model.applications[CHARM_NAME] + await nats_app.add_units(count=2) + await ops_test.model.wait_for_idle(apps=[*APP_NAMES, TLS_CA_CHARM_NAME], status="active") diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index ff1b975..b2458e5 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -1,12 +1,29 @@ import logging +from pathlib import Path +import nats +import pytest +import yaml +from nats.errors import Error from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +APP_NAME = METADATA["name"] + +@pytest.mark.skip_if_deployed async def test_smoke(ops_test: OpsTest): charm = await ops_test.build_charm(".", verbosity="debug") app = await ops_test.model.deploy(charm) await ops_test.model.block_until(lambda: app.status in ("active", "error"), timeout=300) assert app.status, "active" + + +@pytest.mark.asyncio +async def test_configured_with_token(ops_test: OpsTest): + ip = ops_test.model.applications[APP_NAME].units[0].public_address + with pytest.raises(Error) as e: + await nats.connect(f"nats://{ip}:4222", allow_reconnect=False) + assert e.value.args[0] == "nats: 'Authorization Violation'" diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 6e04c7f..871f1c2 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1,6 +1,6 @@ from pathlib import Path from subprocess import CalledProcessError -from unittest.mock import PropertyMock, patch +from unittest.mock import MagicMock, PropertyMock, patch import pytest from charm import NatsCharm @@ -11,10 +11,11 @@ @pytest.fixture def harness(request): - harness = Harness(NatsCharm) - request.addfinalizer(harness.cleanup) - harness.begin() - yield harness + with patch("nats_config.snap.SnapCache"): + harness = Harness(NatsCharm) + request.addfinalizer(harness.cleanup) + harness.begin() + yield harness @pytest.fixture @@ -39,26 +40,34 @@ def tls_config() -> tuple[bytes, bytes]: ) -def test_on_install_with_snapd_resource(harness: Harness): - with patch("charm.subprocess.check_call") as mocked_cmd, patch( - "charm.SERVER_PATH" - ) as mocked_server_path: +def test_on_install_with_snapd_resource(request): + with patch("nats_config.NATS.SERVER_PATH") as mocked_server_path, patch( + "nats_config.snap" + ) as mocked_snap: + harness = Harness(NatsCharm) + mocked_snap.install_local = MagicMock() + request.addfinalizer(harness.cleanup) + harness.begin() harness.add_resource("core", "core.snap") harness.charm.on.install.emit() - assert mocked_cmd.call_count == 3 mocked_server_path.mkdir.return_value = None mocked_server_path.mkdir.assert_called_once() + mocked_snap.install_local.assert_called_once() -def test_on_install_with_nats_resource(harness: Harness): - with patch("charm.subprocess.check_call") as mocked_cmd, patch( - "charm.SERVER_PATH" +def test_on_install_with_nats_resource(request): + with patch("nats_config.snap") as mocked_snap, patch( + "nats_config.NATS.SERVER_PATH" ) as mocked_server_path: - harness.add_resource("nats", "nats") + harness = Harness(NatsCharm) + mocked_snap.install_local = MagicMock() + request.addfinalizer(harness.cleanup) + harness.begin() + harness.add_resource("nats", "nats.snap") harness.charm.on.install.emit() mocked_server_path.mkdir.return_value = None mocked_server_path.mkdir.assert_called_once() - assert mocked_cmd.call_count == 2 + mocked_snap.install_local.assert_called_once() def DISABLED_test_on_install_snap_failure(harness: Harness): # noqa: N802 @@ -69,10 +78,8 @@ def DISABLED_test_on_install_snap_failure(harness: Harness): # noqa: N802 def test_on_start_all_successfull(harness: Harness): - with patch("charm.subprocess.check_call"): - harness.charm.on.start.emit() - assert harness.model.unit.status == ActiveStatus("") - assert harness.charm.state.is_started + harness.charm.on.start.emit() + assert harness.model.unit.status == ActiveStatus("") def DISABLED_test_block_if_only_one_tls_key_or_cert_given( # noqa: N802 @@ -81,9 +88,9 @@ def DISABLED_test_block_if_only_one_tls_key_or_cert_given( # noqa: N802 with patch( "charm.NatsCluster.listen_address", new_callable=PropertyMock ) as cluster_listen_address, patch( - "charm.NatsClient.listen_address", new_callable=PropertyMock + "charm.NATSClientProvider.listen_address", new_callable=PropertyMock ) as client_listen_address, patch( - "charm.NatsCharm.NATS_SERVER_CONFIG_PATH" + "charm.NATS_SERVER_CONFIG_PATH" ) as mock_config_path, patch( "charm.NatsCharm._open_port" ): @@ -97,7 +104,7 @@ def DISABLED_test_block_if_only_one_tls_key_or_cert_given( # noqa: N802 def test_generate_auth_token(): - token = NatsCharm.get_auth_token(NatsCharm.AUTH_TOKEN_LENGTH) + token = NatsCharm.get_auth_token(64) assert len(token) == 64 @@ -107,29 +114,29 @@ def test_listen_all_addresses_blocks_charm(harness: Harness): with pytest.raises(RuntimeError): with patch("charm.NatsCluster.listen_address", new_callable=PropertyMock), patch( "charm.NatsCluster.ingress_address", new_callable=PropertyMock - ), patch("charm.NatsClient.listen_address", new_callable=PropertyMock), patch( + ), patch("charm.NATSClientProvider.listen_address", new_callable=PropertyMock), patch( "charm.socket.getnameinfo" ) as mocked_hostname: - mocked_hostname.return_value = ("my-nats.com", "1234") + mocked_hostname.return_value = ("my-nats_config.com", "1234") harness.add_relation("ca-client", "easyrsa") - harness.charm.on.config_changed.emit() + harness.charm.ca_client.on.ca_available.emit() assert harness.charm.unit.status == BlockedStatus( "Generating certificates with listen-on-all-addresses option is not supported yet" ) def test_on_config_changed_rewrites_config(tmp_path, harness: Harness): - config_path = tmp_path / "nats.cfg" + config_path = tmp_path / "nats_config.cfg" with patch( "charm.NatsCluster.listen_address", new_callable=PropertyMock, return_value="1.2.3.4" ), patch( "charm.NatsCluster.ingress_address", new_callable=PropertyMock, return_value="1.2.3.4" ), patch( - "charm.NatsCharm.NATS_SERVER_CONFIG_PATH", new=Path(config_path) + "nats_config.NATS.CONFIG_PATH", new=Path(config_path) ) as mock_config_path, patch( - "charm.NatsCharm._open_port", return_value=None - ), patch( - "charm.NatsClient.listen_address", new_callable=PropertyMock, return_value="1.2.3.4" + "charm.NATSClientProvider.listen_address", + new_callable=PropertyMock, + return_value="1.2.3.4", ): harness.update_config( { @@ -152,23 +159,18 @@ def test_on_config_changed_rewrites_config(tmp_path, harness: Harness): def test_writes_nrpe_checks_on_nrpe_available(harness: Harness): - with patch( - "charm.NatsCluster.listen_address", new_callable=PropertyMock, return_value="1.2.3.4" - ), patch( - "charm.NatsCluster.ingress_address", new_callable=PropertyMock, return_value="1.2.3.4" - ), patch( - "charm.NatsClient.listen_address", new_callable=PropertyMock, return_value="1.2.3.4" - ), patch( - "charm.NatsCharm.NATS_SERVER_CONFIG_PATH" - ), patch( - "charm.NatsCharm._open_port", return_value=None - ), patch.object( - harness.charm, "nrpe_client" - ) as mock_client: - mock_client.is_available = PropertyMock(return_value=True) - harness.charm.on.config_changed.emit() - assert mock_client.add_check.called - assert mock_client.commit.called + mocked_check = MagicMock(return_value=None) + mocked_commit = MagicMock(return_value=None) + with patch.multiple( + harness.charm.nrpe_client, + add_check=mocked_check, + commit=mocked_commit, + state=MagicMock(nrpe_ready=True), + ), patch.object(harness.charm, "nats_client", MagicMock()): + harness.charm.nrpe_client.on.nrpe_available.emit() + + assert mocked_check.called, mocked_check.mock_calls + assert mocked_commit.called, mocked_check.mock_calls def test_published_nats_client_data_to_relation(harness: Harness): @@ -177,11 +179,11 @@ def test_published_nats_client_data_to_relation(harness: Harness): ), patch( "charm.NatsCluster.ingress_address", new_callable=PropertyMock, return_value="1.2.3.4" ), patch( - "charm.NatsClient.listen_address", new_callable=PropertyMock, return_value="1.2.3.4" - ), patch( - "charm.NatsCharm.NATS_SERVER_CONFIG_PATH" + "charm.NATSClientProvider.listen_address", + new_callable=PropertyMock, + return_value="1.2.3.4", ), patch( - "charm.NatsCharm._open_port", return_value=None + "nats_config.NATS.CONFIG_PATH" ): with harness.hooks_disabled(): rel = harness.add_relation("client", harness.charm.app.name) diff --git a/tox.ini b/tox.ini index 1d78005..2e81f8d 100644 --- a/tox.ini +++ b/tox.ini @@ -6,7 +6,6 @@ envlist = fmt, lint, unit [vars] src_path = {toxinidir}/src/ tst_path = {toxinidir}/tests/ -lib_path = {toxinidir}/lib/charms/nrpe/ all_path = {[vars]src_path} {[vars]tst_path} [testenv] @@ -24,7 +23,7 @@ passenv = [testenv:fmt] description = Apply coding style standards to code deps = - # renovate: datasource=pypj + # renovate: datasource=pypi black==23.7.0 # renovate: datasource=pypi ruff==0.0.287 @@ -59,7 +58,7 @@ deps = # renovate: datasource=pypi pydantic <= 2.0 # renovate: datasource=pypi - pyOpenSSL==21.0.0 + pyOpenSSL==23.2.0 commands = coverage run --source={[vars]src_path} \ -m pytest \ @@ -77,9 +76,11 @@ deps = pytest==7.4.1 # renovate: datasource=pypi pytest-operator==0.29.0 + nats-py==2.4.0 + tenacity==8.2.3 -r{toxinidir}/requirements.txt commands = - pip install juju=={env:LIBJUJU} + pip install -q juju=={env:LIBJUJU} pytest -v \ -s \ --tb native \