Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging intermittent failure fix and enhancement by capturing memory stats #48

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:

# excluded logging as per https://github.com/MetOffice/dagrunner/issues/39
- name: Run pytest + coverage report gen
run: pytest --cov=dagrunner --cov-report=term --cov-report=html --ignore=dagrunner/tests/utils/logging/test_integration.py | tee coverage_output.txt; test ${PIPESTATUS[0]} -eq 0
run: pytest --cov=dagrunner --cov-report=term --cov-report=html | tee coverage_output.txt; test ${PIPESTATUS[0]} -eq 0


# TESTS (main branch)
Expand All @@ -68,7 +68,7 @@ jobs:
if: steps.cache-ref-coverage.outputs.cache-hit != 'true'
run: |
cd ref
pytest --cov=dagrunner --cov-report=term --cov-report=html --ignore=dagrunner/tests/utils/logging/test_integration.py | tee coverage_output.txt; test ${PIPESTATUS[0]} -eq 0
pytest --cov=dagrunner --cov-report=term --cov-report=html | tee coverage_output.txt; test ${PIPESTATUS[0]} -eq 0

# TESTS (compare coverage)

Expand Down
10 changes: 6 additions & 4 deletions dagrunner/execute_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dagrunner.plugin_framework import NodeAwarePlugin
from dagrunner.runner.schedulers import SCHEDULERS
from dagrunner.utils import (
CaptureProcMemory,
TimeIt,
function_to_argparse,
logger,
Expand Down Expand Up @@ -141,10 +142,11 @@ def plugin_executor(
print(msg)
res = None
if not dry_run:
with TimeIt() as timer:
with dask.config.set(scheduler="single-threaded"):
res = callable_obj(*args, **callable_kwargs)
msg = f"{str(timer)}; {msg}"
with TimeIt() as timer, dask.config.set(
scheduler="single-threaded"
), CaptureProcMemory() as mem:
res = callable_obj(*args, **callable_kwargs)
msg = f"{str(timer)}; {msg}; {mem.max()}"
logging.info(msg)

if verbose:
Expand Down
28 changes: 9 additions & 19 deletions dagrunner/tests/utils/logging/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import sqlite3
import subprocess
import time

import pytest

Expand All @@ -31,25 +30,16 @@ def gen_client_code(loggers):
return code


@pytest.mark.parametrize(
"test_inputs",
[
(
("Python is versatile and powerful.", "root", "info"),
("Lists store collections of items.", "myapp.area1", "debug"),
("Functions encapsulate reusable code.", "myapp.area1", "info"),
("Indentation defines code blocks.", "myapp.area2", "warning"),
("Libraries extend Pythons capabilities.", "myapp.area2", "error"),
),
],
)
def test_sqlitedb(test_inputs, sqlite_filepath, caplog):
def test_sqlitedb(sqlite_filepath, caplog):
test_inputs = (
["Python is versatile and powerful.", "root", "info"],
["Lists store collections of items.", "myapp.area1", "debug"],
["Functions encapsulate reusable code.", "myapp.area1", "info"],
["Indentation defines code blocks.", "myapp.area2", "warning"],
["Libraries extend Pythons capabilities.", "myapp.area2", "error"],
)
client_code = gen_client_code(test_inputs)

with ServerContext(sqlite_filepath=sqlite_filepath):
# Wait for server to start
time.sleep(0.5)
# Run client in subprocess
with ServerContext(sqlite_filepath=sqlite_filepath, verbose=True):
subprocess.run(
["python", "-c", client_code], capture_output=True, text=True, check=True
)
Expand Down
32 changes: 32 additions & 0 deletions dagrunner/tests/utils/test_CaptureProcMemory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'dagrunner' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
import time
from unittest.mock import mock_open, patch

from dagrunner.utils import CaptureProcMemory

# this is what each open call will return
READ_DATA_LIST = [
"""VmPeak: 1024 kB
VmSize: 6144 kB
VmHWM: 3072 kB
VmRSS: 8192 kB
""",
"""VmPeak: 5120 kB
VmSize: 2048 kB
VmHWM: 7168 kB
VmRSS: 4096 kB
""",
]


def test_all():
with patch("builtins.open", mock_open(read_data=READ_DATA_LIST[0])):
with CaptureProcMemory(interval=0.01) as mem:
time.sleep(0.02)
with patch("builtins.open", mock_open(read_data=READ_DATA_LIST[1])):
time.sleep(0.02)
tar = {"VmPeak": 5.0, "VmSize": 6.0, "VmHWM": 7.0, "VmRSS": 8.0}
assert mem.max() == tar
40 changes: 40 additions & 0 deletions dagrunner/tests/utils/test_CaptureSysMemory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'dagrunner' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
import time
from unittest.mock import mock_open, patch

from dagrunner.utils import CaptureSysMemory

# this is what each open call will return
READ_DATA_LIST = [
"""Committed_AS: 1024 kB
MemFree: 6144 kB
Buffers: 3072 kB
Cached: 8192 kB
MemTotal: 1024 kB
""",
"""Committed_AS: 5120 kB
MemFree: 2048 kB
Buffers: 7168 kB
Cached: 4096 kB
MemTotal: 2048 kB
""",
]


def test_all():
with patch("builtins.open", mock_open(read_data=READ_DATA_LIST[0])):
with CaptureSysMemory(interval=0.01) as mem:
time.sleep(0.02)
with patch("builtins.open", mock_open(read_data=READ_DATA_LIST[1])):
time.sleep(0.02)
tar = {
"Committed_AS": 5.0,
"MemFree": 6.0,
"Buffers": 7.0,
"Cached": 8.0,
"MemTotal": 2.0,
}
assert mem.max() == tar
49 changes: 49 additions & 0 deletions dagrunner/tests/utils/test_get_proc_mem_stat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'dagrunner' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
from unittest.mock import mock_open, patch

from dagrunner.utils import get_proc_mem_stat

proc_status = """Name: bash
Umask: 0022
State: S (sleeping)
Tgid: 95395
Ngid: 0
Pid: 95395
PPid: 81896
TracerPid: 0
Uid: 10234 10234 10234 10234
Gid: 1000 1000 1000 1000
FDSize: 256
Groups: 39 203 216 1000 1460 6790 11250
VmPeak: 130448 kB
VmSize: 130448 kB
VmLck: 0 kB
VmPin: 0 kB
VmHWM: 4384 kB
VmRSS: 3108 kB
RssAnon: 2632 kB
RssFile: 476 kB
RssShmem: 0 kB
VmData: 2496 kB
VmStk: 144 kB
VmExe: 888 kB
VmLib: 2188 kB
VmPTE: 76 kB
VmSwap: 0 kB
Threads: 1
"""


@patch("builtins.open", mock_open(read_data=proc_status))
def test_all():
res = get_proc_mem_stat()
tar = {
"VmPeak": 127.390625,
"VmSize": 127.390625,
"VmHWM": 4.28125,
"VmRSS": 3.03515625,
}
assert res == tar
33 changes: 33 additions & 0 deletions dagrunner/tests/utils/test_get_sys_mem_stat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'dagrunner' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
from unittest.mock import mock_open, patch

from dagrunner.utils import get_sys_mem_stat

sys_status = """MemTotal: 7989852 kB
MemFree: 688700 kB
MemAvailable: 1684112 kB
Buffers: 0 kB
Cached: 1526676 kB
SwapCached: 19880 kB
Active: 4136476 kB
Inactive: 2345780 kB
Active(anon): 3494656 kB
Inactive(anon): 1706028 kB
Active(file): 641820 kB
Inactive(file): 639752 kB
"""


@patch("builtins.open", mock_open(read_data=sys_status))
def test_all():
res = get_sys_mem_stat()
tar = {
"MemTotal": 7802.58984375,
"MemFree": 672.55859375,
"Buffers": 0.0,
"Cached": 1490.89453125,
}
assert res == tar
120 changes: 120 additions & 0 deletions dagrunner/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,131 @@
# See LICENSE in the root of the repository for full licensing details.
import argparse
import inspect
import os
import threading
import time
from abc import ABC, abstractmethod

import dagrunner.utils._doc_styles as doc_styles


def get_proc_mem_stat(pid=os.getpid()):
"""
Get process memory statistics from /proc/<pid>/status.

More information can be found at
https://github.com/torvalds/linux/blob/master/Documentation/filesystems/proc.txt

Args:
- `pid`: Process id. Optional. Default is the current process.

Returns:
- Dictionary with memory statistics in MB. Fields are VmSize, VmRSS, VmPeak and
VmHWM.

"""
status_path = f"/proc/{pid}/status"
memory_stats = {}
with open(status_path, "r") as file:
for line in file:
if line.startswith(("VmSize:", "VmRSS:", "VmPeak:", "VmHWM:")):
key, value = line.split(":", 1)
memory_stats[key.strip()] = (
float(value.split()[0].strip()) / 1024.0
) # convert kb to mb
return memory_stats


class _CaptureMemory(ABC):
def __init__(self, interval=1.0, **kwargs):
self._interval = interval
self._max_memory_stats = {}
self._stop_event = threading.Event()
self._params = kwargs

@property
@abstractmethod
def METHOD(self):
pass

def _capture_memory(self):
while not self._stop_event.is_set():
current_stats = self.METHOD(**self._params)
if not self._max_memory_stats:
self._max_memory_stats = {key: 0 for key in current_stats}
for key in current_stats:
if current_stats[key] > self._max_memory_stats[key]:
self._max_memory_stats[key] = current_stats[key]
# Wait for the interval or until stop event is set
if self._stop_event.wait(self._interval):
break

def __enter__(self):
self._thread = threading.Thread(target=self._capture_memory)
self._thread.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
self._stop_event.set()
self._thread.join()

def max(self):
return self._max_memory_stats


class CaptureProcMemory(_CaptureMemory):
"""
Capture maxmimum process memory statistics.

See `get_proc_mem_stat` for more information.
"""

@property
def METHOD(self):
return get_proc_mem_stat

def __init__(self, interval=1.0, pid=os.getpid()):
super().__init__(interval=interval, pid=pid)


def get_sys_mem_stat():
"""
Get system memory statistics from /proc/meminfo.

More information can be found at
https://github.com/torvalds/linux/blob/master/Documentation/filesystems/proc.txt

Returns:
- Dictionary with memory statistics in MB. Fields are Committed_AS, MemFree,
Buffers, Cached and MemTotal.

"""
status_path = "/proc/meminfo"
memory_stats = {}
with open(status_path, "r") as file:
for line in file:
if line.startswith(
("Committed_AS:", "MemFree:", "Buffers:", "Cached:", "MemTotal:")
):
key, value = line.split(":", 1)
memory_stats[key.strip()] = (
float(value.split()[0].strip()) / 1024.0
) # convert kb to mb
return memory_stats


class CaptureSysMemory(_CaptureMemory):
"""
Capture maxmimum system memory statistics.

See `get_sys_mem_stat` for more information.
"""

@property
def METHOD(self):
return get_sys_mem_stat


class ObjectAsStr(str):
"""Hide object under a string."""

Expand Down
Loading
Loading