Skip to content

Commit

Permalink
数据库管理和账号管理支持MongoDB (#2108)
Browse files Browse the repository at this point in the history
* add mongo db type management support

* 更新功能清单支持MongoDB账号管理

* refomat code by black

* add unittest for MySQL&MongoDB management function

---------

Co-authored-by: 小圈圈 <rtttte@qq.com>
  • Loading branch information
quanbisen and hhyo authored Apr 5, 2023
1 parent f611f03 commit efbaa81
Show file tree
Hide file tree
Showing 11 changed files with 1,008 additions and 352 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
| Redis || × || × | × | × | × | × | × | × |
| PgSQL || × || × | × | × | × | × | × | × |
| Oracle |||||| × | × | × | × | × |
| MongoDB |||| × | × | × | × | × | × | × |
| MongoDB |||| × | × | × | × | | × | × |
| Phoenix || × || × | × | × | × | × | × | × |
| ODPS || × | × | × | × | × | × | × | × | × |
| ClickHouse |||| × | × | × | × | × | × | × |
Expand Down
20 changes: 20 additions & 0 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ def get_tables_metas_data(self, db_name, **kwargs):
"""获取数据库所有表格信息,用作数据字典导出接口"""
return list()

def get_all_databases_summary(self):
"""实例数据库管理功能,获取实例所有的数据库描述信息"""
return ResultSet()

def get_instance_users_summary(self):
"""实例账号管理功能,获取实例所有账号信息"""
return ResultSet()

def create_instance_user(self, **kwargs):
"""实例账号管理功能,创建实例账号"""
return ResultSet()

def drop_instance_user(self, **kwargs):
"""实例账号管理功能,删除实例账号"""
return ResultSet()

def reset_instance_user_pwd(self, **kwargs):
"""实例账号管理功能,重置实例账号密码"""
return ResultSet()

def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
"""获取所有字段, 返回一个ResultSet,rows=list"""
return ResultSet()
Expand Down
100 changes: 100 additions & 0 deletions sql/engines/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,3 +1270,103 @@ def kill_op(self, opids):
f"mongodb语句执行killOp报错,语句:db.runCommand({sql}) ,错误信息{traceback.format_exc()}"
)
result.error = str(e)

def get_all_databases_summary(self):
"""实例数据库管理功能,获取实例所有的数据库描述信息"""
query_result = self.get_all_databases()
if not query_result.error:
dbs = query_result.rows
conn = self.get_connection()

# 获取数据库用户信息
rows = []
for db_name in dbs:
# 执行语句
listing = conn[db_name].command(command="usersInfo")
grantees = []
for user_obj in listing["users"]:
grantees.append(
{"user": user_obj["user"], "roles": user_obj["roles"]}.__str__()
)
row = {
"db_name": db_name,
"grantees": grantees,
"saved": False,
}
rows.append(row)
query_result.rows = rows
return query_result

def get_instance_users_summary(self):
"""实例账号管理功能,获取实例所有账号信息"""
query_result = self.get_all_databases()
if not query_result.error:
dbs = query_result.rows
conn = self.get_connection()

# 获取数据库用户信息
rows = []
for db_name in dbs:
# 执行语句
listing = conn[db_name].command(command="usersInfo")
for user_obj in listing["users"]:
rows.append(
{
"db_name_user": f"{db_name}.{user_obj['user']}",
"db_name": db_name,
"user": user_obj["user"],
"roles": [role["role"] for role in user_obj["roles"]],
"saved": False,
}
)
query_result.rows = rows
return query_result

def create_instance_user(self, **kwargs):
"""实例账号管理功能,创建实例账号"""
exec_result = ResultSet()
db_name = kwargs.get("db_name", "")
user = kwargs.get("user", "")
password1 = kwargs.get("password1", "")
remark = kwargs.get("remark", "")
try:
conn = self.get_connection()
conn[db_name].command("createUser", user, pwd=password1, roles=[])
exec_result.rows = [
{
"instance": self.instance,
"db_name": db_name,
"user": user,
"password": password1,
"remark": remark,
}
]
except Exception as e:
exec_result.error = str(e)
return exec_result

def drop_instance_user(self, db_name_user: str, **kwarg):
"""实例账号管理功能,删除实例账号"""
arr = db_name_user.split(".")
db_name = arr[0]
user = arr[1]
exec_result = ResultSet()
try:
conn = self.get_connection()
conn[db_name].command("dropUser", user)
except Exception as e:
exec_result.error = str(e)
return exec_result

def reset_instance_user_pwd(self, db_name_user: str, reset_pwd: str, **kwargs):
"""实例账号管理功能,重置实例账号密码"""
arr = db_name_user.split(".")
db_name = arr[0]
user = arr[1]
exec_result = ResultSet()
try:
conn = self.get_connection()
conn[db_name].command("updateUser", user, pwd=reset_pwd)
except Exception as e:
exec_result.error = str(e)
return exec_result
106 changes: 106 additions & 0 deletions sql/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,112 @@ def get_tables_metas_data(self, db_name, **kwargs):
table_metas.append(_meta)
return table_metas

def get_bind_users(self, db_name: str):
sql_get_bind_users = f"""select group_concat(distinct(GRANTEE)),TABLE_SCHEMA
from information_schema.SCHEMA_PRIVILEGES
where TABLE_SCHEMA='{db_name}'
group by TABLE_SCHEMA;"""
return self.query(
"information_schema", sql_get_bind_users, close_conn=False
).rows

def get_all_databases_summary(self):
"""实例数据库管理功能,获取实例所有的数据库描述信息"""
# 获取所有数据库
sql_get_db = """SELECT SCHEMA_NAME,DEFAULT_CHARACTER_SET_NAME,DEFAULT_COLLATION_NAME
FROM information_schema.SCHEMATA
WHERE SCHEMA_NAME NOT IN ('information_schema', 'performance_schema', 'mysql', 'test', 'sys');"""
query_result = self.query("information_schema", sql_get_db, close_conn=False)
if not query_result.error:
dbs = query_result.rows
# 获取数据库关联用户信息
rows = []
for db in dbs:
bind_users = self.get_bind_users(db_name=db[0])
row = {
"db_name": db[0],
"charset": db[1],
"collation": db[2],
"grantees": bind_users[0][0].split(",") if bind_users else [],
"saved": False,
}
rows.append(row)
query_result.rows = rows
return query_result

def get_instance_users_summary(self):
"""实例账号管理功能,获取实例所有账号信息"""
server_version = self.server_version
# MySQL 5.7.6版本起支持ACCOUNT LOCK
if server_version >= (5, 7, 6):
sql_get_user = "select concat('`', user, '`', '@', '`', host,'`') as query,user,host,account_locked from mysql.user;"
else:
sql_get_user = "select concat('`', user, '`', '@', '`', host,'`') as query,user,host from mysql.user;"
query_result = self.query("mysql", sql_get_user)
if not query_result.error:
db_users = query_result.rows
# 获取用户权限信息
rows = []
for db_user in db_users:
user_host = db_user[0]
user_priv = self.query(
"mysql", "show grants for {};".format(user_host), close_conn=False
).rows
row = {
"user_host": user_host,
"user": db_user[1],
"host": db_user[2],
"privileges": user_priv,
"saved": False,
"is_locked": db_user[3] if server_version >= (5, 7, 6) else None,
}
rows.append(row)
query_result.rows = rows
return query_result

def create_instance_user(self, **kwargs):
"""实例账号管理功能,创建实例账号"""
# escape
user = MySQLdb.escape_string(kwargs.get("user", "")).decode("utf-8")
host = MySQLdb.escape_string(kwargs.get("host", "")).decode("utf-8")
password1 = MySQLdb.escape_string(kwargs.get("password1", "")).decode("utf-8")
remark = kwargs.get("remark", "")
# 在一个事务内执行
hosts = host.split("|")
create_user_cmd = ""
accounts = []
for host in hosts:
create_user_cmd += (
f"create user '{user}'@'{host}' identified by '{password1}';"
)
accounts.append(
{
"instance": self.instance,
"user": user,
"host": host,
"password": password1,
"remark": remark,
}
)
exec_result = self.execute(db_name="mysql", sql=create_user_cmd)
exec_result.rows = accounts
return exec_result

def drop_instance_user(self, user_host: str, **kwarg):
"""实例账号管理功能,删除实例账号"""
# escape
user_host = MySQLdb.escape_string(user_host).decode("utf-8")
return self.execute(db_name="mysql", sql=f"DROP USER {user_host};")

def reset_instance_user_pwd(self, user_host: str, reset_pwd: str, **kwargs):
"""实例账号管理功能,重置实例账号密码"""
# escape
user_host = MySQLdb.escape_string(user_host).decode("utf-8")
reset_pwd = MySQLdb.escape_string(reset_pwd).decode("utf-8")
return self.execute(
db_name="mysql", sql=f"ALTER USER {user_host} IDENTIFIED BY '{reset_pwd}';"
)

def get_all_columns_by_tb(self, db_name, tb_name, **kwargs):
"""获取所有字段, 返回一个ResultSet"""
sql = f"""SELECT
Expand Down
Loading

0 comments on commit efbaa81

Please sign in to comment.