Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
acracker committed May 16, 2024
1 parent fcdf1e1 commit 43fc4e8
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 82 deletions.
2 changes: 1 addition & 1 deletion data_watchtower/api/handlers/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from .base import BaseHandler
from ...utils import get_subclasses
from ...core.base import get_registered_data_loaders
from ... import get_registered_data_loaders


class DataLoaderListHandler(BaseHandler):
Expand Down
32 changes: 32 additions & 0 deletions data_watchtower/api/handlers/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .base import BaseHandler
from ...utils import get_subclasses
from ... import get_registered_validators


class ValidatorListHandler(BaseHandler):
def get(self):
data = []
validators = get_registered_validators()
for cls in validators:

if isinstance(cls.__doc__, str):
description = cls.__doc__.split("\f")[0].strip()
else:
description = ""
row = dict(
name=cls.__name__,
module_path=cls.module_path(),
schema=cls.to_schema(),
description=description,
)
data.append(row)
result = dict(
records=data
)
self.json(result)
return

def post(self):
return self.get()
77 changes: 54 additions & 23 deletions data_watchtower/api/handlers/watchtower.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# -*- coding: utf-8 -*-
from .base import BaseHandler
from ...core.watchtower import Watchtower
from ...core.base import get_get_registered_data_loader_maps
from ... import get_registered_data_loader_maps
from ...core import get_registered_validators, get_registered_validator_maps


class WatchtowerHandler(BaseHandler):
Expand All @@ -25,7 +26,7 @@ def get(self):

def post(self):
params = self.json_loads(self.request.body)
data_loader_maps = get_get_registered_data_loader_maps()
data_loader_maps = get_registered_data_loader_maps()
data_loader_cls = data_loader_maps[params.pop('data_loader_cls')]
data_loader = data_loader_cls.from_dict(params.pop('data_loader_params'))
watchtower = Watchtower(name=params.pop('name'), data_loader=data_loader, **params)
Expand All @@ -34,7 +35,7 @@ def post(self):

def put(self):
params = self.json_loads(self.request.body)
data_loader_maps = get_get_registered_data_loader_maps()
data_loader_maps = get_registered_data_loader_maps()
data_loader_cls = data_loader_maps[params.pop('data_loader_cls')]
data_loader = data_loader_cls.from_dict(params.pop('data_loader_params'))
watchtower = Watchtower(name=params.pop('name'), data_loader=data_loader, **params)
Expand All @@ -52,7 +53,8 @@ def get(self):
item['data_loader'] = data_loader.pop('__class__')
item['data_loader_cls'] = item['data_loader'].split(':')[-1]
item['data_loader_params'] = data_loader
data_loader_maps = get_get_registered_data_loader_maps()
data_loader_maps = get_registered_data_loader_maps()
# todo 如果data_loader被删除, 则会报错
data_loader_cls = data_loader_maps[item['data_loader_cls']]
item['data_loader_schema'] = data_loader_cls.to_schema()
if isinstance(item.get('params'), dict):
Expand All @@ -69,23 +71,52 @@ def post(self):
return self.get()


class DataLoaderListHandler(BaseHandler):
def get(self):
name = self.get_argument('name')
data = []
data_loaders = get_registered_data_loaders()
for cls in data_loaders:
row = dict(
name=cls.__name__,
module_path=cls.module_path(),
schema=cls.to_schema(),
)
data.append(row)
result = dict(
records=data
)
self.json(result)
return

class ValidatorRelationHandler(BaseHandler):
def post(self):
return self.get()
"""
处理POST请求,用于向数据库中的watchtower添加新的validator。
接收JSON格式的请求体,包含watchtower的名称(name)、validator的类名(validator),
以及validator的参数(params)。然后根据这些信息,从数据库中获取对应的watchtower,
实例化指定的validator,并将其添加到watchtower中。
参数:
- 无
返回值:
- 如果操作成功,返回一个空的JSON对象;如果watchtower或validator未找到,返回包含错误信息的JSON对象。
"""

# 从请求体中加载JSON数据
data = self.json_loads(self.request.body)
name = data['name'] # watchtower的名称
validator_class_name = data['validator'] # validator的类名
params = data['params'] # validator的参数

# 尝试从数据库获取指定名称的watchtower
wt = self.database.get_watchtower(name)
if not wt:
# 如果watchtower未找到,返回错误信息
self.json(error={'err_code': 1001, 'err_msg': 'watchtower not found'})
return

# 获取注册的validator类
validator_cls = get_registered_validator_maps().get(validator_class_name)
if validator_cls is None:
# 如果validator类未找到,返回错误信息
self.json(error={'err_code': 1002, 'err_msg': 'validator not found'})
return

# 实例化validator,并将其转换为字典格式
validator = validator_cls.from_params(**params)
validator_item = validator.to_dict()

# 从validator字典中提取类名和参数,准备添加到数据库
validator = validator_item['__class__']
params = self.json_dumps(validator_item['params'])

# 将validator添加到watchtower中
self.database.add_validator_to_watchtower(name, validator, params)

# 返回成功的JSON响应
return self.json()
5 changes: 3 additions & 2 deletions data_watchtower/api/url.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .handlers import watchtower, data_loader
from .handlers import watchtower, data_loader, validator

URLS = [
(r"/", data_loader.DataLoaderListHandler),
(r"/", validator.ValidatorListHandler),
(r"/data_watchtower/v1/watchtower", watchtower.WatchtowerHandler),
(r"/data_watchtower/v1/watchtowers", watchtower.WatchtowerListHandler),
(r"/data_watchtower/v1/data_loaders", data_loader.DataLoaderListHandler),
(r"/data_watchtower/v1/validators", validator.ValidatorListHandler),

]
2 changes: 2 additions & 0 deletions data_watchtower/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .data_loaders import get_registered_data_loader_maps, get_registered_data_loaders
from .validators import get_registered_validators, get_registered_validator_maps
47 changes: 16 additions & 31 deletions data_watchtower/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
# -*- coding: utf-8 -*-
import datetime
import logging
from functools import lru_cache
import polars as pl
import pandas as pd
from attrs import define, field, NOTHING
from apischema import settings, schema
from apischema.json_schema import deserialization_schema
from apischema.objects import ObjectField

from ..utils import to_dict, from_dict, to_snake, get_subclasses, load_subclasses
from ..utils import to_dict, from_dict, to_snake

logger = logging.getLogger(__name__)

Expand All @@ -19,13 +18,18 @@

def attrs_fields(cls: type):
if hasattr(cls, "__attrs_attrs__"):
return [
ObjectField(
a.name, a.type, required=a.default == NOTHING, default=a.default,
metadata=schema(**(a.metadata or {})),
result = []
for col in getattr(cls, "__attrs_attrs__"):
metadata = dict(col.metadata) or {}
if 'help' in metadata:
metadata['description'] = metadata.pop('help')
x = ObjectField(
col.name, col.type, required=col.default == NOTHING, default=col.default,
metadata=schema(**metadata),
)
for a in getattr(cls, "__attrs_attrs__")
]
result.append(x)
return result

else:
return prev_default_object_fields(cls)

Expand Down Expand Up @@ -71,29 +75,6 @@ def to_schema(cls):
return deserialization_schema(cls)


@lru_cache()
def get_get_registered_data_loader_maps():
custom_path = ["dw_custom.data_loaders"]
subclasses = get_subclasses(DataLoader)
try:
custom_subclasses = load_subclasses(custom_path, DataLoader)
except ModuleNotFoundError:
custom_subclasses = []
result = {}
for cls in (subclasses + custom_subclasses):
cls_name = cls.__name__
if cls_name in result:
raise ValueError("Duplicate data loader name: %s" % cls_name)
else:
result[cls.__name__] = cls
return result


def get_registered_data_loaders():
result = get_get_registered_data_loader_maps()
return list(result.values())


@define()
class ValidationResult(BaseBean):
success = field(default=None, type=bool)
Expand All @@ -114,6 +95,10 @@ def __init__(self, params: Params):
self.result = None
self.params = params

@classmethod
def to_schema(cls):
return deserialization_schema(cls.Params)

@classmethod
def from_params(cls, **kwargs):
params = cls.Params(**kwargs)
Expand Down
30 changes: 29 additions & 1 deletion data_watchtower/core/data_loaders.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from functools import lru_cache

import pandas as pd
import polars as pl
from attrs import define, field
from ..utils import connect_db_from_url

from .base import DataLoader
from ..utils import connect_db_from_url, get_subclasses, load_subclasses

CUSTOM_DATA_LOADER_PATH = os.getenv("DW_CUSTOM_DATA_LOADER_PATH", "dw_custom.data_loaders")


@define()
Expand All @@ -31,3 +36,26 @@ def _load(self):
connection.close()
database.close()
return data


@lru_cache()
def get_registered_data_loader_maps():
custom_path = CUSTOM_DATA_LOADER_PATH.split(";")
subclasses = get_subclasses(DataLoader)
try:
custom_subclasses = load_subclasses(custom_path, DataLoader)
except ModuleNotFoundError:
custom_subclasses = []
result = {}
for cls in (subclasses + custom_subclasses):
cls_name = cls.__name__
if cls_name in result:
raise ValueError("Duplicate data loader name: %s" % cls_name)
else:
result[cls.__name__] = cls
return result


def get_registered_data_loaders():
result = get_registered_data_loader_maps()
return list(result.values())
4 changes: 3 additions & 1 deletion data_watchtower/core/macro.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import logging
import datetime
from data_watchtower.utils import load_object
Expand Down Expand Up @@ -36,7 +37,8 @@

}
try:
custom_macro = load_object('dw_custom.macros:DEFAULT_MACRO_CONFIG')
dw_custom_macro_config = os.getenv("DW_CUSTOM_MACRO_CONFIG", 'dw_custom.macros:DEFAULT_MACRO_CONFIG')
custom_macro = load_object(dw_custom_macro_config)
DEFAULT_MACRO_CONFIG.update(custom_macro)
logger.info('custom macros loaded. count:%s' % len(custom_macro))
except (ModuleNotFoundError, NameError):
Expand Down
33 changes: 31 additions & 2 deletions data_watchtower/core/validators.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,43 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import datetime
from functools import lru_cache
from attrs import define, field
from .base import Validator, ValidationResult

from ..utils import get_subclasses, load_subclasses

CUSTOM_VALIDATOR_PATH = os.getenv("DW_CUSTOM_VALIDATOR_PATH", "dw_custom.data_loaders")


@lru_cache()
def get_registered_validator_maps():
custom_path = CUSTOM_VALIDATOR_PATH.split(";")
subclasses = get_subclasses(Validator)
try:
custom_subclasses = load_subclasses(custom_path, Validator)
except ModuleNotFoundError:
custom_subclasses = []
result = {}
for cls in (subclasses + custom_subclasses):
cls_name = cls.__name__
if cls_name in result:
raise ValueError("Duplicate data loader name: %s" % cls_name)
else:
result[cls.__name__] = cls
return result


def get_registered_validators():
result = get_registered_validator_maps()
return list(result.values())


class ExpectColumnValuesToNotBeNull(Validator):
@define()
class Params:
column = field()
column = field(type=str)

def __init__(self, params: Params):
super().__init__(params)
Expand All @@ -33,7 +62,7 @@ class ExpectColumnRecentlyUpdated(Validator):

@define()
class Params:
update_time_column = field(metadata={'help': 'update_time字段名称'})
update_time_column = field(type=str, metadata={'help': 'update_time字段名称'})
days = field(default=0, type=int)
hours = field(default=0, type=int)

Expand Down
Loading

0 comments on commit 43fc4e8

Please sign in to comment.