Skip to content

Commit

Permalink
支持mongodb进程状态查看 (#1563)
Browse files Browse the repository at this point in the history
* 修复下载权限名字错误

* 支持mongodb进程状态查看

* mongodb进程状态查看、kill功能迁移到engine中

* mongodb进程状态查bug修复

Co-authored-by: Leo Q <LeoQuote@users.noreply.github.com>
Co-authored-by: 小圈圈 <rtttte@qq.com>
  • Loading branch information
3 people authored Jun 10, 2022
1 parent ba1fdb6 commit d1ce601
Show file tree
Hide file tree
Showing 7 changed files with 478 additions and 121 deletions.
24 changes: 23 additions & 1 deletion common/static/dist/js/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@ var dateFormat = function(fmt, date) {
return fmt;
};

// 格式与高亮json格式的字符串
var jsonHighLight = function(json) {
json = json.toString().replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;');
return json.replace(/("(\\u[a-zA-Z0-9]{4}|\\[^u]|[^\\"])*"(\s*:)?|\b(true|false|null)\b|-?\d+(?:\.\d*)?(?:[eE][+\-]?\d+)?)/g, function (match) {
var cls = 'text-muted';
if (/^"/.test(match)) {
if (/:$/.test(match)) {
cls = 'text-success';
} else {
match = match
cls = 'text-primary';
}
} else if (/true|false/.test(match)) {
cls = 'text-success';
} else if (/null/.test(match)) {
cls = 'text-warning';
}
return '<span class="' + cls + '">' + match + '</span>';
});
};

// 这个函数存在报错因此不应该把任何模块放在这个模块之后
// 实例配置页面根据db_type选择显示或隐藏mode字段mode字段只适用于redis实例
(function($) {
$(function() {
Expand All @@ -40,4 +62,4 @@ var dateFormat = function(fmt, date) {
toggleMode($(this).val());
});
});
})(django && django.jQuery || jQuery);
})(django && django.jQuery || jQuery);
33 changes: 33 additions & 0 deletions common/utils/extend_json_encoder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# -*- coding: UTF-8 -*-
import base64
import simplejson as json

from decimal import Decimal
from datetime import datetime, date, timedelta
from functools import singledispatch
from ipaddress import IPv4Address, IPv6Address
from uuid import UUID
from bson.objectid import ObjectId
from bson.timestamp import Timestamp


@singledispatch
Expand Down Expand Up @@ -58,6 +61,16 @@ def _(o):
return str(o)


@convert.register(ObjectId)
def _(o):
return str(o)


@convert.register(Timestamp)
def _(o):
return str(o)


class ExtendJSONEncoder(json.JSONEncoder):
def default(self, obj):
try:
Expand All @@ -76,3 +89,23 @@ def default(self, obj):
return convert(obj)
except TypeError:
return super(ExtendJSONEncoderFTime, self).default(obj)


# 使用simplejson处理形如 b'\xaa' 的bytes类型数据会失败,但使用json模块构造这个对象时不能使用bigint_as_string方法
import json
class ExtendJSONEncoderBytes(json.JSONEncoder):
def default(self, obj):
try:
# 使用convert.register处理会报错 ValueError: Circular reference detected
# 不是utf-8格式的bytes格式需要先进行base64编码转换
if isinstance(obj, bytes):
try:
return o.decode('utf-8')
except:
return base64.b64encode(obj).decode('utf-8')
else:
return convert(obj)
except TypeError:
print(type(obj))
return super(ExtendJSONEncoderBytes, self).default(obj)

152 changes: 101 additions & 51 deletions sql/db_diagnostic.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import logging
import traceback
import MySQLdb

import simplejson as json
#import simplejson as json
import json
from django.contrib.auth.decorators import permission_required

from django.http import HttpResponse

from sql.engines import get_engine
from common.utils.extend_json_encoder import ExtendJSONEncoder
from common.utils.extend_json_encoder import ExtendJSONEncoder, ExtendJSONEncoderBytes
from sql.utils.resource_group import user_instances
from .models import AliyunRdsConfig, Instance

from .aliyun_rds import process_status as aliyun_process_status, create_kill_session as aliyun_create_kill_session, \
kill_session as aliyun_kill_session, sapce_status as aliyun_sapce_status

logger = logging.getLogger('default')

# 问题诊断--进程列表
@permission_required('sql.process_view', raise_exception=True)
Expand All @@ -21,94 +24,129 @@ def process(request):
command_type = request.POST.get('command_type')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

base_sql = "select id, user, host, db, command, time, state, ifnull(info,'') as info from information_schema.processlist"
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_process_status(request)
else:
# escape
command_type = MySQLdb.escape_string(command_type).decode('utf-8')

if command_type == 'All':
sql = base_sql + ";"
elif command_type == 'Not Sleep':
sql = "{} where command<>'Sleep';".format(base_sql)
query_engine = get_engine(instance=instance)
query_result = None
if instance.db_type == 'mysql':
base_sql = "select id, user, host, db, command, time, state, ifnull(info,'') as info from information_schema.processlist"
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_process_status(request)
else:
sql = "{} where command= '{}';".format(base_sql, command_type)
query_engine = get_engine(instance=instance)
query_result = query_engine.query('information_schema', sql)
# escape
command_type = MySQLdb.escape_string(command_type).decode('utf-8')
if not command_type:
command_type = 'Query'
if command_type == 'All':
sql = base_sql + ";"
elif command_type == 'Not Sleep':
sql = "{} where command<>'Sleep';".format(base_sql)
else:
sql = "{} where command= '{}';".format(base_sql, command_type)

query_result = query_engine.query('information_schema', sql)

elif instance.db_type == 'mongo':
query_result = query_engine.current_op(command_type)
print(query_result)

else:
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的进程列表查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if query_result:
if not query_result.error:
processlist = query_result.to_dict()
result = {'status': 0, 'msg': 'ok', 'rows': processlist}
else:
result = {'status': 1, 'msg': query_result.error}

# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
# ExtendJSONEncoderBytes 使用json模块,bigint_as_string只支持simplejson
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoderBytes),
content_type='application/json')


# 问题诊断--通过进程id构建请求
# 问题诊断--通过线程id构建请求 这里只是用于确定将要kill的线程id还在运行
@permission_required('sql.process_kill', raise_exception=True)
def create_kill_session(request):
instance_name = request.POST.get('instance_name')
thread_ids = request.POST.get('ThreadIDs')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

result = {'status': 0, 'msg': 'ok', 'data': []}
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_create_kill_session(request)
query_engine = get_engine(instance=instance)
if instance.db_type == 'mysql':
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_create_kill_session(request)
else:
thread_ids = json.loads(thread_ids)

sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = query_engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
result['data'] = kill_sql

elif instance.db_type == 'mongo':
kill_command = query_engine.get_kill_command(json.loads(thread_ids))
result['data'] = kill_command

else:
thread_ids = json.loads(thread_ids)
query_engine = get_engine(instance=instance)
sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = query_engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
result['data'] = kill_sql
result = {'status': 1, 'msg': '暂时不支持%s类型数据库通过进程id构建请求' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')
# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type='application/json')


# 问题诊断--终止会话
# 问题诊断--终止会话 这里是实际执行kill的操作
@permission_required('sql.process_kill', raise_exception=True)
def kill_session(request):
instance_name = request.POST.get('instance_name')
thread_ids = request.POST.get('ThreadIDs')
result = {'status': 0, 'msg': 'ok', 'data': []}

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_kill_session(request)
engine = get_engine(instance=instance)
if instance.db_type == 'mysql':
# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_kill_session(request)
else:
thread_ids = json.loads(thread_ids)

sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
engine.execute('information_schema', kill_sql)

elif instance.db_type == 'mongo':
engine.kill_op(json.loads(thread_ids))

else:
thread_ids = json.loads(thread_ids)
engine = get_engine(instance=instance)
sql = "select concat('kill ', id, ';') from information_schema.processlist where id in ({});"\
.format(','.join(str(tid) for tid in thread_ids))
all_kill_sql = engine.query('information_schema', sql)
kill_sql = ''
for row in all_kill_sql.rows:
kill_sql = kill_sql + row[0]
engine.execute('information_schema', kill_sql)
result = {'status': 1, 'msg': '暂时不支持%s类型数据库终止会话' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

# 返回查询结果
return HttpResponse(json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
Expand All @@ -121,11 +159,15 @@ def tablesapce(request):
instance_name = request.POST.get('instance_name')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if instance.db_type != 'mysql':
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的表空间信息查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

# 判断是RDS还是其他实例
if AliyunRdsConfig.objects.filter(instance=instance, is_enable=True).exists():
result = aliyun_sapce_status(request)
Expand Down Expand Up @@ -164,11 +206,15 @@ def trxandlocks(request):
instance_name = request.POST.get('instance_name')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if instance.db_type != 'mysql':
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的锁等待查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

query_engine = get_engine(instance=instance)
server_version = query_engine.server_version
if server_version < (8, 0, 1):
Expand Down Expand Up @@ -247,11 +293,15 @@ def innodb_trx(request):
instance_name = request.POST.get('instance_name')

try:
instance = user_instances(request.user, db_type=['mysql']).get(instance_name=instance_name)
instance = user_instances(request.user).get(instance_name=instance_name)
except Instance.DoesNotExist:
result = {'status': 1, 'msg': '你所在组未关联该实例', 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

if instance.db_type != 'mysql':
result = {'status': 1, 'msg': '暂时不支持%s类型数据库的长事务查询' % instance.db_type , 'data': []}
return HttpResponse(json.dumps(result), content_type='application/json')

query_engine = get_engine(instance=instance)
sql = '''select trx.trx_started,
trx.trx_state,
Expand Down
5 changes: 4 additions & 1 deletion sql/engines/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ def json(self):
def to_dict(self):
tmp_list = []
for r in self.rows:
tmp_list += [dict(zip(self.column_list, r))]
if isinstance(r,dict):
tmp_list += [r]
else:
tmp_list += [dict(zip(self.column_list, r))]
return tmp_list

def to_sep_dict(self):
Expand Down
Loading

0 comments on commit d1ce601

Please sign in to comment.