Skip to content

Commit

Permalink
Fix old dag can not be loaded after dag altered (#59)
Browse files Browse the repository at this point in the history
* Fix old dag can not be loaded after dag altered

* Add auto refresh on result page
  • Loading branch information
dispensable authored Nov 18, 2020
1 parent adbdcb0 commit 54fa3e1
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 9 deletions.
90 changes: 89 additions & 1 deletion frontend/src/components/HTicketDetail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,18 @@
<a-button @click="onApprove" type="primary">Approve</a-button>
</a-button-group>
<a-button v-show="showResultButton" :style="{ marginLeft: '16px' }" @click="toggleResult">{{resultButtonText}}</a-button>
<a-button v-show="resultVisible" :style="{ marginLeft: '5px' }" @click="loadTResult">Refresh</a-button>

<a-switch v-show="resultVisible" :style="{ marginLeft: '5px' }"
:checked-children="autoRefreshBtnText"
:un-checked-children="autoRefreshBtnText"
:loading="isRefreshing"
:checked="autoRefreshOn"
:disabled="autoRefreshDisabled"
@click="toggleAutoRefresh"
/>
</a-row>
<h-ticket-result :style="{ marginTop: '16px' }" :is-visible="resultVisible" :ticket-id="ticketInfo.id" :ticketProvider="ticketInfo.provider_type"></h-ticket-result>
<h-ticket-result :style="{ marginTop: '16px' }" :is-visible="resultVisible" :ticket-id="ticketInfo.id" ref="ticketResult" :ticketProvider="ticketInfo.provider_type"></h-ticket-result>
</a-card>
</h-base>
</template>
Expand Down Expand Up @@ -128,7 +138,14 @@ export default {
'failed': {'status': 'error', 'stepKey': 4},
'unknown': {'status': 'process', 'stepKey': 4}
},
ticketEndStatus: ['rejected', 'submit_error', 'succeed', 'success', 'failed', 'unknown'],
rejectReason: null,
loadingIntervalId: null,
autoRefreshOn: false,
autoRefreshDisabled: false,
autoRefreshBtnText: 'Auto Refresh OFF',
autoRefreshBtnUpdateTimer: null,
isRefreshing: false,
}
},
computed: {
Expand Down Expand Up @@ -265,6 +282,77 @@ export default {
error.response.data.data.description
)
})
},
loadTResult (e, callback) {
this.loadTickets()
this.$refs.ticketResult.loadResult(callback)
},
isTicketEndStatus() {
const ticketStatus = this.ticketInfo.status
return (ticketStatus
&& this.statusToStepStatus[ticketStatus]
&& this.ticketEndStatus.includes(ticketStatus))
},
notifyFinishedTicket() {
this.$notification.open({
message: 'This ticket is in final status',
description: 'Ticket has been marked as final status, refresh is invailed.',
duration: 0
})
},
toggleAutoRefresh (checked) {
if (checked && !this.autoRefreshOn) {
this.autoRefreshOn = true
this.autoRefreshBtnText = "Auto Refresh On"
const interval = 10000
const maxAutoRefreshTry = 360 // 1 hour
let tried = 0
let passedTime = 0
this.loadingIntervalId = setInterval(() => {
// prevent muiltiple refresh
if (this.isRefreshing) {
this.autoRefreshBtnText = 'Refreshing ...'
return
}
// update clockdown in switch
if (passedTime <= interval) {
this.autoRefreshBtnText = "Refresh after " + (interval - passedTime) / 1000 + " s ..."
passedTime += 1000
} else {
this.isRefreshing = true
tried += 1
this.loadTResult(null, (isRefreshGoing, resp, isError) => {
this.isRefreshing = isRefreshGoing
if (isError || this.isTicketEndStatus() || tried > maxAutoRefreshTry) {
clearInterval(this.loadingIntervalId)
const finished = this.isTicketEndStatus()
const isMaxRefreshExceeded = tried > maxAutoRefreshTry
this.autoRefreshBtnText = finished ? "Finished" : (isMaxRefreshExceeded ? "Max Refresh Try Exceeded" : "Error")
if (finished) {
this.notifyFinishedTicket()
} else {
this.$notification.open({
message: isMaxRefreshExceeded ? 'Auto Refresh tried exceeded ' + maxAutoRefreshTry + ' times' : 'Ticket result load error',
description: isMaxRefreshExceeded ? '' : resp.description,
duration: 0
})
}
this.autoRefreshOn = false
this.autoRefreshDisabled = isMaxRefreshExceeded ? false : true
this.isRefreshing = false
}
} )
passedTime = 0
}
}, 1000)
} else {
clearInterval(this.loadingIntervalId)
this.autoRefreshBtnText = "Auto Refresh OFF"
this.autoRefreshOn = false
}
}
},
mounted () {
Expand Down
15 changes: 13 additions & 2 deletions frontend/src/components/HTicketResult.vue
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export default {
dataLoaded: false,
currentComponent: 'ResultHostTable',
dagComponent: 'Hdag',
spinning: true,
spinning: false,
isDagAvaliable: false,
diagramData: {
nodeDataArray: [],
Expand All @@ -44,15 +44,26 @@ export default {
}
},
methods: {
loadResult () {
loadResult (callback) {
if (this.spinning) {
console.warn("Data is loading already, please wait a moment and retry!")
return
}
this.spinning = true
this.dataLoaded = false
HRequest.get('/api/ticket/' + this.ticketId + '/result').then(
(response) => {
this.handleResult(response.data.data)
if (callback) {
callback(this.spinning, response.data.data, false)
}
}
).catch((error) => {
this.$message.error('Get result error: ' + error.response.data.data.description, 10)
this.spinning = false
if (callback) {
callback(this.spinning, error.response.data.data, true)
}
})
// this.handleResult({'sa': {'failed': true, 'succeeded': false, 'description': 'farly long description'}})
},
Expand Down
37 changes: 31 additions & 6 deletions helpdesk/models/provider/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@
logger = logging.getLogger(__name__)


class FakeTaskInstance:
""" Fake task instance for
if new dag def update node in the dag, old result will not contain this node
"""
def __init__(self):
self.state = 'no_status'
self.try_number = 0

def __getitem__(self, item):
return getattr(self, item, None)


AIRFLOW_FAKE_TI = FakeTaskInstance()


class AirflowProvider(BaseProvider):
provider_type = 'airflow'

Expand Down Expand Up @@ -173,9 +188,14 @@ def _dag_graph_to_gojs_flow(self, dag_id, execution):
if airflow_graph:
for node in airflow_graph['nodes']:
ti = execution['task_instances'].get(node['id'])
state = ti['state']
if state in status_filter:
continue
if not ti:
# if new dag def update node in the dag, old result will not contain this node
logger.warning(f"task_id {node['id']} can not be found in task_instance")
state = 'no_status'
else:
state = ti['state']
if state in status_filter:
continue
result['nodeDataArray'].append({
'key': node['id'],
'text': f'{self._format_exec_status(state)} {node["value"]["label"]}',
Expand All @@ -186,8 +206,12 @@ def _dag_graph_to_gojs_flow(self, dag_id, execution):
for edge in airflow_graph['edges']:
ti_v = execution['task_instances'].get(edge['v'])
ti_u = execution['task_instances'].get(edge['u'])
if any((ti_u['state'] in status_filter, ti_v['state'] in status_filter)):
continue
if not ti_v or not ti_u:
# if new dag def update node in the dag, old result will not contain this node
logger.warning(f"task instance can not be found in task_instance")
else:
if any((ti_u['state'] in status_filter, ti_v['state'] in status_filter)):
continue
result['linkDataArray'].append({
'to': edge['v'],
'from': edge['u']
Expand Down Expand Up @@ -220,7 +244,8 @@ def _build_result_from_dag_exec(self, execution, execution_id):
if not task_instance:
# if new dag def update node in the dag, old result will not contain this node
logger.warning(f"task_id {task_id} can not be found in task_instance")
continue
# fake task instance
task_instance = AIRFLOW_FAKE_TI
# do not show specific status node
if status_filter and task_instance['state'] in status_filter:
continue
Expand Down

0 comments on commit 54fa3e1

Please sign in to comment.