Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New scanner #16

Merged
merged 3 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Config/basic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# 是否忽略以 IO.IngorePrefix 开头和以 IO.IngoreSuffix 结尾的文件
# 这项文件禁用给构建器工程提供了方便的禁用手段
IO.EnableIngore: True
IO.IngorePrefix: ['#','.']
IO.IngoreSuffix: ['.disabled']

# 是否读取符号链接
IO.EnableSymbolink: False

# 当遇到符号链接的时候是否停止并报错
IO.EnableSymbolinkError: False
4 changes: 4 additions & 0 deletions Core/IO/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from typing import List,T
from .scanner import File,Dire
from .writer import regist_filewrite_function,write_string
from .reader import regist_fileread_function,read_string,read_yaml
80 changes: 80 additions & 0 deletions Core/IO/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'''
文件IO模块
'''
from typing import List,Tuple,Dict,Union
import os
import json
import yaml
from Core.debug import Logger

logger = Logger('IO')

def read_string(filepath:str):
'''读取字符串文件'''
with open(filepath, "r+",encoding="utf-8") as file:
return file.read()

def write_string(filepath:str,data:str):
'''写入字符串文件'''
with open(filepath, "w",encoding="utf-8") as file:
return file.write(data)

def read_json(filepath) -> dict:
''' 读取 Json 文件 '''
if not os.path.exists(filepath):
raise FileNotFoundError(f'{filepath} not exist!')
with open(filepath,encoding='utf-8') as f:
data = json.load(f)
return data

def read_yaml(filepath:str) -> dict:
''' 读取 Yaml 文件 '''
if not os.path.exists(filepath):
raise FileNotFoundError(f'{filepath} not exist!')
with open(filepath,encoding='utf-8') as f:
data = yaml.load(f,Loader=yaml.FullLoader)
if data is None:
data = {}
data.update({'file_path':filepath})
return data

def read_python(filepath:str) -> callable:
''' 读取 Python 文件 '''
from Core.module_manager import reg_script
if not os.path.exists(filepath):
raise FileNotFoundError(f'{filepath} not exist!')
return reg_script(filepath)


read_func_mapping = {
'yml':read_yaml,
'yaml':read_yaml,
'json':read_json,
'py':read_python,
'python':read_python,
}

def read(file,func = None,usecache:bool = True):
'''读取文件'''
if not func:
if usecache and file.data:
return file.data
if file.extention in read_func_mapping:
func = read_func_mapping[file.extention]
else:
func = read_string
file.data = func(file.abs_path)
return file.data
return func(file.abs_path)

def regist_fileread_function(func:callable,file_extens:str) -> None:
'''注册读取后缀名为 `file_extens` 的文件的函数'''
def reg_fileread(func,file_exten:str):
read_func_mapping[file_exten] = func
if isinstance(file_extens,list):
for exten in file_extens:
reg_fileread(func,exten)
elif isinstance(file_extens,str):
reg_fileread(func,file_extens)
else:
raise TypeError()
125 changes: 125 additions & 0 deletions Core/IO/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import re
import os
import sys
from typing import List,Dict,Union
from Core.config import config
from .reader import read
from .writer import write

ALL = re.compile('.*')
SEP = os.path.sep

ENABLE_SYMLINK = config('IO.EnableSymbolink')
ENABLE_SYMLINK_ERROR = config('IO.EnableSymbolinkError')
ENABLE_INGORE = config('IO.EnableIngore')
INGORE_PREFIX = config('IO.IngorePrefix')
INGORE_SUFFIX = config('IO.IngoreSuffix')

class SymbolLinkError(Exception):
'''Target is a symbol link'''

class IsAFileError(Exception):
'''Operation dosen't work on files'''

class File():
def __init__(self,abs_path,read_init = False):
self.abs_path = abs_path
self.fullname = os.path.basename(abs_path)
self.name,self.extention = os.path.splitext(self.fullname)
self.extention = self.extention.removeprefix('.')
self.data = None
if read_init:
self.read()

def __str__(self):
return f"File({self.fullname})"

def read(self,func = None,usecache:bool = True):
'''读取文件'''
return read(self,func,usecache)

def write(self,*args,func = None,**kwargs):
'''写入文件'''
return read(self,*args,func,**kwargs)

class Dire():
def __init__(self,abs_path):
if not os.path.exists(abs_path):
raise FileNotFoundError(f'{abs_path} not exist')
if not os.path.isdir(abs_path):
raise IsAFileError(f'{abs_path} is a file')
self.abs_path = abs_path
self.files = self.dires = None
self.name = os.path.basename(abs_path)
self.files:dict[str,File] = {}
self.dires:dict[str,'Dire'] = {}
self.__self_scan()

def __add_node(self,path) -> None:
basename = os.path.basename(path)
if os.path.isfile(path):
self.files[basename] = File(path)
elif os.path.isdir(path):
self.dires[basename] = Dire(path)
elif os.path.islink(path):
if ENABLE_SYMLINK_ERROR:
self.__add_node(os.readlink(path))
elif ENABLE_SYMLINK_ERROR:
raise SymbolLinkError(f'{path} is a symbol link')
else:
raise NotImplementedError(f'Cannot judge type of {path}, it is not a file, directory, or a symbol link.')

def __should_be_ingore(self,name):
basename = os.path.basename(name)
for prefix in INGORE_PREFIX:
if basename.startswith(prefix):
return True
for suffix in INGORE_SUFFIX:
if basename.endswith(suffix):
return True
return False

def __self_scan(self):
self.files = {}
self.dires = {}
for rel_path in os.listdir(self.abs_path):
if ENABLE_INGORE:
if self.__should_be_ingore(rel_path):
continue
self.__add_node(f'{self.abs_path}{SEP}{rel_path}')

def scan_subdir(self,patten:Union[str|re.Pattern] = ALL):
'''和 `scan` 效果相似,函数会返回该文件夹所有下边的一层文件夹的指定文件列表'''
return self.scan(patten=patten,recur=True,min_recur_deepth=1,max_recur_deepth=1)

def scan(self,patten:Union[str|re.Pattern] = ALL,
recur:bool=False,dire_patten:Union[str|re.Pattern] = ALL,
min_recur_deepth:int = 0,max_recur_deepth:Union[int|None] = sys.maxsize
) -> List[File]:
'''遍历文件夹下所有文件夹所有文件, 读取其中符合正则表达式的文件, 最后以列表形式输出
## 参数
### 常规
* `[patten]` 文件需要满足的正则,默认 `".*"`
### 递归模式
* `[recur]` 指示是否递归子文件夹,默认 `False`
* `[dire_patten]` 文件夹需要满足的正则,默认 `".*"`
* `[min_recur_deepth]` 遍历的最小深度,默认为 `0`
* `[max_recur_deepth]` 遍历的最大深度,默认为 `sys.maxsize`
'''
if not (self.files or self.dires):
self.__self_scan()
output = []
if min_recur_deepth <= 0 :
for filename in self.files:
if re.match(patten,filename):
output.append(self.files[filename])
if recur:
for direname in self.dires:
if re.match(dire_patten,direname)\
and max_recur_deepth > 0:
output.extend(self.dires[direname].scan(
patten=patten,recur=recur,dire_patten=dire_patten,
min_recur_deepth = min_recur_deepth - 1,
max_recur_deepth = max_recur_deepth - 1
))
return output
28 changes: 28 additions & 0 deletions Core/IO/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
def write_string(filepath:str,data:str):
'''写入字符串文件'''
with open(filepath, "w",encoding="utf-8") as file:
return file.write(data)

write_func_mapping = {}

def write(file,*arg,func = None,**kwarg):
'''写入文件'''
if not func:
if file.extention in write_func_mapping:
func = write_func_mapping[file.extention]
else:
func = write_string
return file.data
return func(file.abs_path,*arg,**kwarg)

def regist_filewrite_function(func:callable,file_extens:str) -> None:
'''注册写入后缀名为 `file_extens` 的文件的函数'''
def reg_filewrite(func,file_exten:str):
write_func_mapping[file_exten] = func
if isinstance(file_extens,list):
for exten in file_extens:
reg_filewrite(func,exten)
elif isinstance(file_extens,str):
reg_filewrite(func,file_extens)
else:
raise TypeError()
24 changes: 24 additions & 0 deletions Core/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import yaml

config_dict = {}
def config(key):
return config_dict.get(key)

def partly_init():
global config_dict
envpath = os.path.dirname(os.path.dirname(__file__))
filepath = f"{envpath}{os.path.sep}Config{os.path.sep}basic.yml"
if not os.path.exists(filepath):
raise FileNotFoundError(f'Config file {filepath} not exist! Please re-install the program')
with open(filepath,encoding='utf-8') as f:
data:dict = yaml.load(f,Loader=yaml.FullLoader)
config_dict = data

def fully_init():
from .IO import Dire
files = Dire(os.path.dirname(os.path.dirname(__file__))).scan(recur=True,patten=r'.*\.yml')
for file in files:
config_dict.update(file.read())

partly_init()
28 changes: 20 additions & 8 deletions Core/i18n.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
import os
from locale import getlocale as get_sys_locale
from string import Template
from .io import scan_dire, try_scan_dire
from .IO import Dire
from .debug import Logger

locales = {}
syslang,_ = get_sys_locale()
logger = Logger('i18n')

def init(locales_tree):
'''初始化'''
envpath = os.path.dirname(os.path.dirname(__file__))
file_infos = scan_dire(f"{envpath}{os.path.sep}i18n",)
for data,lang,_ in file_infos:
locales_tree[lang] = data
i18n_dire = Dire(f"{envpath}{os.path.sep}i18n",)
files = i18n_dire.scan()
for file in files:
locales_tree[file.name] = file.read()

def append_locale(path):
'''加载更多本地化包'''
locales_tree = locales
file_infos = try_scan_dire(path)
for data,lang,_ in file_infos:
try:
dire = Dire(path)
except FileNotFoundError:
return
except Exception as ex:
logger.error(ex)
return
files = dire.scan()
for file in files:
lang = file.name
if lang in locales_tree:
locales_tree[lang].update(data)
locales_tree[lang].update(file.read())
else:
locales_tree[lang] = data
locales_tree[lang] = file.read()

def locale(key:str,*args,lang:str=syslang,**kwargs):
'''从键值获取字符串'''
Expand Down
Loading