Skip to content

Commit

Permalink
task命名修改,增加在线查询超时时间配置,超时连接主动关闭 #100
Browse files Browse the repository at this point in the history
  • Loading branch information
hhyo committed Apr 14, 2019
1 parent 3eb81ef commit 8bc29b3
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 70 deletions.
10 changes: 10 additions & 0 deletions common/templates/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ <h5 style="color: darkgrey"><b>SQL查询</b></h5>
</div>
</div>
</div>
<div class="form-group">
<label for="inception_remote_backup_port"
class="col-sm-4 control-label">MAX_EXECUTION_TIME</label>
<div class="col-sm-5">
<input type="number" class="form-control" id="max_execution_time"
key="max_execution_time"
value="{{ config.max_execution_time }}"
placeholder="在线查询超时时间阈值,单位秒,默认60">
</div>
</div>
<div class="form-group">
<label for="admin_query_limit"
class="col-sm-4 control-label">ADMIN_QUERY_LIMIT</label>
Expand Down
65 changes: 39 additions & 26 deletions sql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from django.db import connection, OperationalError
from django.db.models import Q
from django.http import HttpResponse
from django_q.tasks import async_task, fetch

from common.config import SysConfig
from common.utils.extend_json_encoder import ExtendJSONEncoder
from sql.query_privileges import query_priv_check
from .models import QueryLog, Instance
from sql.engines import get_engine
from sql.engines import get_engine, ResultSet

logger = logging.getLogger('default')

Expand Down Expand Up @@ -48,6 +49,7 @@ def query(request):
return HttpResponse(json.dumps(result), content_type='application/json')

try:
config = SysConfig()
# 查询前的检查,禁用语句检查,语句切分
query_engine = get_engine(instance=instance)
query_check_info = query_engine.query_check(db_name=db_name, sql=sql_content)
Expand All @@ -56,7 +58,7 @@ def query(request):
result['status'] = 1
result['msg'] = query_check_info.get('msg')
return HttpResponse(json.dumps(result), content_type='application/json')
if query_check_info.get('has_star') and SysConfig().get('disable_star') is True:
if query_check_info.get('has_star') and config.get('disable_star') is True:
# 引擎内部判断为有 * 且禁止 * 选项打开
result['status'] = 1
result['msg'] = query_check_info.get('msg')
Expand All @@ -79,38 +81,49 @@ def query(request):
# 对查询sql增加limit限制或者改写语句
sql_content = query_engine.filter_sql(sql=sql_content, limit_num=limit_num)

# 执行查询语句,统计执行时间
t_start = time.time()
query_result = query_engine.query(db_name=str(db_name), sql=sql_content, limit_num=limit_num)
t_end = time.time()
query_result.query_time = "%5s" % "{:.4f}".format(t_end - t_start)
# 执行查询语句
max_execution_time = int(config.get('max_execution_time', 60))
query_task_id = async_task(query_engine.query, db_name=str(db_name), sql=sql_content, limit_num=limit_num,
timeout=max_execution_time)
# 等待执行结果,仅等待max_execution_time时长
query_task = fetch(query_task_id, wait=max_execution_time * 1000)
# 在max_execution_time内执行结束
if query_task:
query_result = query_task.result
query_result.query_time = query_task.time_taken()
# 等待超时,async_task主动关闭连接
else:
query_result = ResultSet(full_sql=sql_content)
query_result.error = '查询超时,已被主动终止!'

# 数据脱敏,仅对查询无错误的结果集进行脱敏
if SysConfig().get('data_masking') and query_result.error is None:
try:
# 记录脱敏时间
t_start = time.time()
masking_result = query_engine.query_masking(db_name=db_name, sql=sql_content, resultset=query_result)
t_end = time.time()
masking_result.mask_time = "%5s" % "{:.4f}".format(t_end - t_start)
# 脱敏出错,并且开启query_check,直接返回异常,禁止执行
if masking_result.error and SysConfig().get('query_check'):
result['status'] = 1
result['msg'] = masking_result.error
# 脱敏出错,关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
elif masking_result.error and not SysConfig().get('query_check'):
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
if config.get('data_masking') and query_result.error is None:
query_masking_task_id = async_task(query_engine.query_masking, db_name=db_name, sql=sql_content,
resultset=query_result)
query_masking_task = fetch(query_masking_task_id, wait=-1)
if query_masking_task.success:
masking_result = query_masking_task.result
masking_result.mask_time = query_masking_task.time_taken()
# 脱敏出错
if masking_result.error:
# 开启query_check,直接返回异常,禁止执行
if config.get('query_check'):
result['status'] = 1
result['msg'] = masking_result.error
# 关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
else:
query_result.error = None
priv_check = False
result['data'] = query_result.__dict__
# 正常脱敏
else:
result['data'] = masking_result.__dict__
except Exception as e:
else:
logger.error(f'数据脱敏异常,查询语句:{sql_content}\n,错误信息:{traceback.format_exc()}')
# 抛出未定义异常,并且开启query_check,直接返回异常,禁止执行
if SysConfig().get('query_check'):
if config.get('query_check'):
result['status'] = 1
result['msg'] = f'数据脱敏异常,请联系管理员,错误信息:{e}'
result['msg'] = f'数据脱敏异常,请联系管理员,错误信息:{query_masking_task.result}'
# 关闭query_check,忽略错误信息,返回未脱敏数据,权限校验标记为跳过
else:
query_result.error = None
Expand Down
13 changes: 6 additions & 7 deletions sql/sql_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
from sql.notify import notify_for_audit
from sql.models import ResourceGroup, Users
from sql.utils.resource_group import user_groups, user_instances
from sql.utils.jobs import add_sqlcronjob, del_sqlcronjob
from sql.utils.tasks import add_sql_schedule, del_schedule
from sql.utils.sql_review import can_timingtask, can_cancel, can_execute
from sql.utils.sql_utils import get_syntax_type
from sql.utils.workflow_audit import Audit
from .models import SqlWorkflow, SqlWorkflowContent, Instance
from django_q.tasks import async_task
Expand Down Expand Up @@ -308,7 +307,7 @@ def timing_task(request):
return render(request, 'error.html', context)

run_date = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M")
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
task_name = f"{Const.workflowJobprefix['sqlreview']}-{workflow_id}"

# 使用事务保持数据一致性
try:
Expand All @@ -317,7 +316,7 @@ def timing_task(request):
workflow_detail.status = 'workflow_timingtask'
workflow_detail.save()
# 调用添加定时任务
add_sqlcronjob(job_id, run_date, workflow_id)
add_sql_schedule(task_name, run_date, workflow_id)
# 增加工单日志
audit_id = Audit.detail_by_workflow_id(workflow_id=workflow_id,
workflow_type=WorkflowDict.workflow_type[
Expand Down Expand Up @@ -396,10 +395,10 @@ def cancel(request):
else:
raise PermissionDenied

# 删除定时执行job
# 删除定时执行task
if workflow_detail.status == 'workflow_timingtask':
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
del_sqlcronjob(job_id)
task_name = f"{Const.workflowJobprefix['sqlreview']}-{workflow_id}"
del_schedule(task_name)
# 将流程状态修改为人工终止流程
workflow_detail.status = 'workflow_abort'
workflow_detail.save()
Expand Down
4 changes: 2 additions & 2 deletions sql/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sql.sql_optimize
from common import auth, config, workflow, dashboard, check
from sql import views, sql_workflow, sql_analyze, query, slowlog, instance, db_diagnostic, resource_group, binlog
from sql.utils import jobs
from sql.utils import tasks

urlpatterns = [
path('', views.sqlworkflow),
Expand Down Expand Up @@ -53,7 +53,7 @@
path('sqlworkflow_list/', sql_workflow.sql_workflow_list),
path('simplecheck/', sql_workflow.check),
path('getWorkflowStatus/', sql_workflow.get_workflow_status),
path('del_sqlcronjob/', jobs.del_sqlcronjob),
path('del_sqlcronjob/', tasks.del_schedule),

path('sql_analyze/generate/', sql_analyze.generate),
path('sql_analyze/analyze/', sql_analyze.analyze),
Expand Down
33 changes: 0 additions & 33 deletions sql/utils/jobs.py

This file was deleted.

35 changes: 35 additions & 0 deletions sql/utils/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding:utf-8 -*-
from django_q.tasks import schedule
from django_q.models import Schedule

import logging

logger = logging.getLogger('default')


def add_sql_schedule(name, run_date, workflow_id):
"""添加/修改sql定时任务"""
del_schedule(name)
schedule('sql.utils.execute_sql.execute', workflow_id,
hook='sql.utils.execute_sql.execute_callback',
name=name, schedule_type='O', next_run=run_date, repeats=1)
logger.debug(f"添加SQL定时执行任务:{name} 执行时间:{run_date}")


def del_schedule(name):
"""删除task"""
try:
sql_schedule = Schedule.objects.get(name=name)
Schedule.delete(sql_schedule)
logger.debug(f'删除task:{name}')
except Schedule.DoesNotExist:
logger.debug(f'删除task:{name}失败,任务不存在')


def task_info(name):
"""获取定时任务详情"""
try:
sql_schedule = Schedule.objects.get(name=name)
return sql_schedule
except Schedule.DoesNotExist:
pass
4 changes: 2 additions & 2 deletions sql/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sql.engines import get_engine
from common.utils.permission import superuser_required
from sql.engines.models import ReviewResult, ReviewSet
from sql.utils.jobs import job_info
from sql.utils.tasks import task_info

from .models import Users, SqlWorkflow, QueryPrivileges, ResourceGroup, \
QueryPrivilegesApply, Config, SQL_WORKFLOW_CHOICES
Expand Down Expand Up @@ -94,7 +94,7 @@ def detail(request, workflow_id):
# 获取定时执行任务信息
if workflow_detail.status == 'workflow_timingtask':
job_id = Const.workflowJobprefix['sqlreview'] + '-' + str(workflow_id)
job = job_info(job_id)
job = task_info(job_id)
if job:
run_date = job.next_run
else:
Expand Down

0 comments on commit 8bc29b3

Please sign in to comment.