-
Notifications
You must be signed in to change notification settings - Fork 53
/
kclogger.py
117 lines (96 loc) · 3.53 KB
/
kclogger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import logging
from typing import Callable
from logging import handlers
from pathlib import Path
DEBUG = True
LOGFILE = Path(__file__).parent / "log.log"
# NAME = __package__
NAME = "SDN"
L = logging.WARNING
if DEBUG:
L = logging.DEBUG
FMTDICT = {
'DEBUG': ["[36m", "DBG"],
'INFO': ["[37m", "INF"],
'WARN': ["[33m", "WRN"],
'WARNING': ["[33m", "WRN"],
'ERROR': ["[31m", "ERR"],
'CRITICAL': ["[35m", "CRT"],
}
class KcHandler(logging.StreamHandler):
with_same_line = False
def emit(self, record):
try:
msg = self.format(record)
stream = self.stream
is_same_line = getattr(record, "same_line", False)
was_same_line = self.with_same_line
self.with_same_line = is_same_line
# 上次不是 这次是 则打印到新行, 但下次打印到同一行(除非再次设置为False)
if was_same_line and not is_same_line:
# 上次是 sameline 但这次不是 则补换行
stream.write(self.terminator)
end = "" if is_same_line else self.terminator
stream.write(msg + end)
self.flush()
except RecursionError:
raise
except Exception:
self.handleError(record)
class KcFilter(logging.Filter):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.translate_func = lambda _: _
def fill_color(self, color_code="[37m", msg=""):
return f'\033{color_code}{msg}\033[0m'
def filter(self, record: logging.LogRecord) -> bool:
# 颜色map
color_code, level_shortname = FMTDICT.get(record.levelname, ["[37m", "UN"])
record.msg = self.translate_func(record.msg)
record.msg = self.fill_color(color_code, record.msg)
record.levelname = self.fill_color(color_code, level_shortname)
return True
class KcLogger(logging.Logger):
def __init__(self, name, level=logging.NOTSET):
super().__init__(name, level)
def set_translate(self, translate_func):
for handler in self.handlers:
for filter in handler.filters:
if not isinstance(filter, KcFilter):
continue
filter.translate_func = translate_func
def close(self):
for h in reversed(self.handlers[:]):
try:
try:
h.acquire()
h.flush()
h.close()
except (OSError, ValueError):
pass
finally:
h.release()
except BaseException:
...
def getLogger(name="CLOG", level=logging.INFO, fmt='[%(name)s-%(levelname)s]: %(message)s', fmt_date="%H:%M:%S") -> KcLogger:
fmter = logging.Formatter('[%(levelname)s]:%(filename)s>%(lineno)s: %(message)s')
# 按 D/H/M 天时分 保存日志, backupcount 为保留数量
dfh = handlers.TimedRotatingFileHandler(filename=LOGFILE, when='D', backupCount=2)
dfh.setLevel(logging.DEBUG)
dfh.setFormatter(fmter)
# 命令行打印
filter = KcFilter()
fmter = logging.Formatter(fmt, fmt_date)
ch = KcHandler()
ch.setLevel(level)
ch.setFormatter(fmter)
ch.addFilter(filter)
l = KcLogger(name)
l.setLevel(level)
# 防止卸载模块后重新加载导致 重复打印
if not l.hasHandlers():
# 注意添加顺序, ch有filter, 如果fh后添加 则会默认带上ch的filter
l.addHandler(dfh)
l.addHandler(ch)
return l
logger = getLogger(NAME, L)