Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto-generate ruleset cache on source change #2133

Merged
merged 19 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master (unreleased)

### New Features
- regenerate ruleset cache automatically on source change (only in dev mode) #2133 @s-ff

- add landing page https://mandiant.github.io/capa/ @williballenthin #2310
- add rules website https://mandiant.github.io/capa/rules @DeeyaSingh #2310
Expand Down
61 changes: 61 additions & 0 deletions capa/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import gzip
import inspect
Expand All @@ -13,6 +14,7 @@
import importlib.util
from typing import Dict, Union, BinaryIO, Iterator, NoReturn
from pathlib import Path
from datetime import datetime

import tqdm
import msgspec.json
Expand Down Expand Up @@ -291,3 +293,62 @@ def is_running_standalone() -> bool:
# so we keep this in a common area.
# generally, other library code should not use this function.
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")


def is_dev_environment() -> bool:
if is_running_standalone():
return False

if "site-packages" in __file__:
# running from a site-packages installation
return False
Comment on lines +302 to +304
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also consider dist-packages here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem important but if you can demonstrate it's relevant this shouldn't be hard


capa_root = Path(__file__).resolve().parent.parent
git_dir = capa_root / ".git"

if not git_dir.is_dir():
# .git directory doesn't exist
return False

return True


def is_cache_newer_than_rule_code(cache_dir: Path) -> bool:
"""
basic check to prevent issues if the rules cache is older than relevant rules code

args:
cache_dir: the cache directory containing cache files

returns:
True if latest cache file is newer than relevant rule cache code
"""

# retrieve the latest modified cache file
cache_files = list(cache_dir.glob("*.cache"))
if not cache_files:
logger.debug("no rule cache files found")
return False

latest_cache_file = max(cache_files, key=os.path.getmtime)
cache_timestamp = os.path.getmtime(latest_cache_file)

# these are the relevant rules code files that could conflict with using an outdated cache
latest_rule_code_file = max([Path("capa/rules/__init__.py"), Path("capa/rules/cache.py")], key=os.path.getmtime)
rule_code_timestamp = os.path.getmtime(latest_rule_code_file)

if rule_code_timestamp > cache_timestamp:

def ts_to_str(ts):
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")

logger.warning(
"latest rule code file %s (%s) is newer than the latest rule cache file %s (%s)",
latest_rule_code_file,
ts_to_str(rule_code_timestamp),
latest_cache_file,
ts_to_str(cache_timestamp),
)
return False

return True
11 changes: 10 additions & 1 deletion capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,22 @@ def get_rules_from_cli(args) -> RuleSet:
raises:
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
enable_cache: bool = True
try:
if capa.helpers.is_running_standalone() and args.is_default_rules:
cache_dir = get_default_root() / "cache"
else:
cache_dir = capa.rules.cache.get_default_cache_directory()

rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir)
if capa.helpers.is_dev_environment():
# using the rules cache during development may result in unexpected errors, see #1898
enable_cache = capa.helpers.is_cache_newer_than_rule_code(cache_dir)
mr-tz marked this conversation as resolved.
Show resolved Hide resolved
if not enable_cache:
logger.debug("not using cache. delete the cache file manually to use rule caching again")
mr-tz marked this conversation as resolved.
Show resolved Hide resolved
else:
logger.debug("cache can be used, no potentially outdated cache files found")

rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir, enable_cache=enable_cache)
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
logger.error(
Expand Down
9 changes: 6 additions & 3 deletions capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2130,12 +2130,14 @@ def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
enable_cache: bool = True,
) -> RuleSet:
"""
args:
rule_paths: list of paths to rules files or directories containing rules files
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
enable_cache: enable loading of a cached ruleset (default: True)
"""
if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory()
Expand All @@ -2147,9 +2149,10 @@ def get_rules(
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]

ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset
if enable_cache:
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset

rules: List[Rule] = []

Expand Down
6 changes: 6 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import codecs

import capa.helpers
from capa.features.extractors import helpers


Expand Down Expand Up @@ -64,3 +65,8 @@ def test_generate_symbols():
symbols = list(helpers.generate_symbols("ws2_32", "#1", include_dll=False))
assert len(symbols) == 1
assert "ws2_32.#1" in symbols


def test_is_dev_environment():
# testing environment should be a dev environment
assert capa.helpers.is_dev_environment() is True
40 changes: 40 additions & 0 deletions tests/test_rule_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import os
import textwrap
import contextlib
from pathlib import Path

import capa.rules
import capa.helpers
import capa.rules.cache

R1 = capa.rules.Rule.from_yaml(
Expand Down Expand Up @@ -113,3 +116,40 @@ def test_ruleset_cache_invalid():
assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None
# the invalid cache should be deleted
assert not path.exists()


def test_rule_cache_dev_environment():
# generate rules cache
rs = capa.rules.RuleSet([R2])
content = capa.rules.cache.get_ruleset_content(rs)
id = capa.rules.cache.compute_cache_identifier(content)
cache_dir = capa.rules.cache.get_default_cache_directory()
cache_path = capa.rules.cache.get_cache_path(cache_dir, id)

# clear existing cache files
for f in cache_dir.glob("*.cache"):
f.unlink()

capa.rules.cache.cache_ruleset(cache_dir, rs)
assert cache_path.exists()

assert capa.helpers.is_cache_newer_than_rule_code(cache_dir) is True

capa_root = Path(__file__).resolve().parent.parent
cachepy = capa_root / "capa" / "rules" / "cache.py" # alternative: capa_root / "capa" / "rules" / "__init__.py"

# set cache's last modified time prior to code file's modified time
os.utime(cache_path, (cache_path.stat().st_atime, cachepy.stat().st_mtime - 600000))

# debug
def ts_to_str(ts):
from datetime import datetime

return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")

for g in ((capa_root / "capa" / "rules").glob("*.py"), cache_dir.glob("*.cache")):
for p in g:
print(p, "\t", ts_to_str(p.stat().st_mtime)) # noqa: T201

assert capa.helpers.is_dev_environment() is True
assert capa.helpers.is_cache_newer_than_rule_code(cache_dir) is False