Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

immune system #28

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/dev.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Dev
- fix coding convention
```
ruff check --fix .
```
1 change: 0 additions & 1 deletion sonagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def __init__(self, memory_path, skills, config: dict) -> None:
self.skills_dict = {}
logger.info("--------- Start Done.---------")


openai = self.config.get('openai')
if openai.get('api_type', None) == 'openai':
self.chat_service = OpenAIChatCompletion(
Expand Down
Empty file added sonagent/immune/__init__.py
Empty file.
60 changes: 60 additions & 0 deletions sonagent/immune/immune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from datetime import datetime
from typing import Any, Dict, Optional

from sonagent.loggers import bufferHandler
from sonagent.utils.datetime_helpers import format_date

logger = logging.getLogger(__name__)


class ImmuneSystem:
"""
Class for Self Immune System that allows self-monitoring and self-recovery of the system.
"""
error_logs = []

@staticmethod
def get_logs(limit: Optional[int]) -> Dict[str, Any]:
"""Returns the last X logs"""
if limit:
buffer = bufferHandler.buffer[-limit:]
else:
buffer = bufferHandler.buffer
records = [[format_date(datetime.fromtimestamp(r.created)),
r.created * 1000, r.name, r.levelname,
r.message + ('\n' + r.exc_text if r.exc_text else '')]
for r in buffer]
return {'log_count': len(records), 'logs': records}

def immune_scan(self) -> None:
"""
Scan the system for any anomalies.
"""
logs = ImmuneSystem.get_logs(5)

# Check for any errors in the logs
errors = [log for log in logs['logs'] if log[3] == 'ERROR']

errors_detected = []
for error in errors:
if error not in ImmuneSystem.error_logs:
ImmuneSystem.error_logs.append(error)
errors_detected.append(error)

if len(errors_detected) > 0:
logger.info(f"Errors detected: {errors_detected}")
else:
logger.debug("No errors detected.")

def immune_recover(self) -> None:
"""
Recover the system from any anomalies.
"""
if ImmuneSystem.error_logs:
print(f"Recovering from errors: {ImmuneSystem.error_logs}")
logger.info(f"Recovering from errors: {ImmuneSystem.error_logs}")
ImmuneSystem.error_logs = []
else:
print("No errors detected.")
logger.info("No errors detected.")
2 changes: 1 addition & 1 deletion sonagent/sonbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,4 @@ def notify_status(self, msg: str, msg_type=RPCMessageType.STATUS) -> None:
self.rpc.send_msg({
'type': msg_type,
'status': msg
})
})
6 changes: 6 additions & 0 deletions sonagent/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sonagent.enums.enums import State
from sonagent.enums.rpcmessagetype import RPCMessageType
from sonagent.exceptions import OperationalException, TemporaryError
from sonagent.immune.immune import ImmuneSystem
from sonagent.sonbot import SonBot

logger = logging.getLogger(__name__)
Expand All @@ -28,10 +29,12 @@ def __init__(self, args: Dict[str, Any], config: Optional[dict] = None) -> None:
self._init(False)

self._heartbeat_msg: float = 0
self._immune = ImmuneSystem()

# Tell systemd that we completed initialization phase
self._notify("READY=1")


def _init(self, reconfig: bool) -> None:
# Init the instance of the bot
self.sonbot = SonBot(self._config, args=self._args)
Expand Down Expand Up @@ -106,6 +109,9 @@ def _worker(self, old_state: Optional[State]) -> State:
)
self._heartbeat_msg = now

# Ping Immune System for scan
self._immune.immune_scan()

return state

def _throttle(
Expand Down
Loading