Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #7153: raydata enable/disable auto processing of scans from the GUI #7199

Merged
merged 18 commits into from
Aug 7, 2024
35 changes: 35 additions & 0 deletions sirepo/package_data/static/js/raydata.js
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ SIREPO.app.directive('scansTable', function() {
}

function init() {
getAutomaticAnalysis();
setColumnHeaders();
if (scanService.cachedScans($scope.analysisStatus)) {
loadScans(scanService.cachedScans($scope.analysisStatus));
Expand Down Expand Up @@ -641,6 +642,39 @@ SIREPO.app.directive('scansTable', function() {
scanRequestInterval = $interval(doRequest, 5000);
}

function getAutomaticAnalysis() {
requestSender.sendStatelessCompute(
appState,
json => {
appState.models.runAnalysis.automaticAnalysis = json.data.automaticAnalysis ? 1 : 0;
appState.saveChanges('runAnalysis');
},
{
method: 'get_automatic_analysis',
args: {
catalogName: appState.applicationState().catalog.catalogName
}
},
errorOptions
);
}


function setAutomaticAnalysis() {
requestSender.sendStatelessCompute(
appState,
json => {
},
{
method: 'set_automatic_analysis',
args: {
automaticAnalysis: appState.models.runAnalysis.automaticAnalysis,
catalogName: appState.applicationState().catalog.catalogName }
},
errorOptions
);
}

function setColumnHeaders() {
$scope.columnHeaders = [
...columnsService.defaultColumns($scope.analysisStatus, appState),
Expand Down Expand Up @@ -842,6 +876,7 @@ SIREPO.app.directive('scansTable', function() {
};

$scope.$on(`${$scope.modelName}.changed`, () => sendScanRequest(true, true));
$scope.$on('runAnalysis.changed', setAutomaticAnalysis);
$scope.$on('catalog.changed', () => sendScanRequest(true, true));
$scope.$on('metadataColumns.changed', () => {
sendScanRequest(true);
Expand Down
4 changes: 3 additions & 1 deletion sirepo/package_data/static/json/raydata-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"searchStopTime":["Stop", "DateTimePicker", null],
"searchText": ["Search Text", "OptionalString", "", "Search across all text columns by term.<br /><br />Phrases can be quoted, ex. \"canted mode\".<br /><br />Negations can be added to the search text by adding a minus (-) sign to the word, however negations must be included with another search term, ex. csx -flatfield.<br /><br />ex. csx \"canted mode\" pinhole -flatfield<br /><br />Use the individual search fields below to search for a specific field value. Wildcards (*) may be used with individual field searches, ex. <b>owner</b> xf*"],
"pageSize": ["Page Size", "Integer", 15],
"automaticAnalysis": ["Automatically Run Analysis", "Boolean", "0"],
"scans": ["", "RunAnalysisTable", ""],
"confirmRunAnalysis": ["Hide this message for this session", "Boolean", "0"],
"searchTerms": ["", "SearchTerms", []]
Expand Down Expand Up @@ -118,7 +119,8 @@
"searchTerms"
],
"advanced": [
"pageSize"
"pageSize",
"automaticAnalysis"
]
},
"simulation": {
Expand Down
49 changes: 37 additions & 12 deletions sirepo/raydata/scan_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import sqlalchemy.orm
import zipfile


#: task(s) monitoring the execution of the analysis process
_ANALYSIS_PROCESSOR_TASKS = None

Expand All @@ -43,6 +44,7 @@
# TODO(e-carlin): tune this number
_MAX_NUM_SCANS = 1000


# Fields that come from the top-level of metadata (as opposed to start document).
# Must match key name from _default_columns()
_METADATA_COLUMNS = {"start", "stop", "suid"}
Expand All @@ -57,6 +59,10 @@
#: path scan_monitor registers to receive api requests
_URI = "/scan-monitor"

#: Whether or not new scans should be automatically analyzed (per catalog)
_WANT_AUTOMATIC_ANALYSIS_FOR_CATALOG = PKDict()


_SIM_TYPE = "raydata"

cfg = None
Expand Down Expand Up @@ -434,6 +440,15 @@ def _all_pdfs(rduids):
).raise_for_status()
return PKDict()

def _request_get_automatic_analysis(self, req_data):
return PKDict(
data=PKDict(
automaticAnalysis=_WANT_AUTOMATIC_ANALYSIS_FOR_CATALOG[
req_data.catalogName
]
)
)

def _request_get_scans(self, req_data):
s = 1
if req_data.analysisStatus == "allStatuses":
Expand Down Expand Up @@ -483,7 +498,7 @@ def _request_run_analysis(self, req_data):
def _request_run_engine_event_callback(self, req_data):
# Start as a task. No need to hold request until task is
# completed because the caller does nothing with the response.
asyncio.create_task(
pkasyncio.create_task(
sirepo.raydata.adaptive_workflow.run_engine_event_callback(req_data)
)
return PKDict()
Expand All @@ -497,6 +512,12 @@ def _request_scan_fields(self, req_data):
)
)

def _request_set_automatic_analysis(self, req_data):
_WANT_AUTOMATIC_ANALYSIS_FOR_CATALOG[req_data.catalogName] = bool(
int(req_data.automaticAnalysis)
)
return PKDict(data="ok")

def _sr_authenticate(self, token):
if (
token
Expand Down Expand Up @@ -558,7 +579,7 @@ async def _process_analysis_queue():

assert not _ANALYSIS_PROCESSOR_TASKS
_ANALYSIS_PROCESSOR_TASKS = [
asyncio.create_task(_process_analysis_queue())
pkasyncio.create_task(_process_analysis_queue())
] * cfg.concurrent_analyses
await asyncio.gather(*_ANALYSIS_PROCESSOR_TASKS)

Expand Down Expand Up @@ -596,10 +617,9 @@ def _monitor_catalog(catalog_name):
assert catalog_name not in _CATALOG_MONITOR_TASKS
return asyncio.create_task(_poll_catalog_for_scans(catalog_name))

if not cfg.automatic_analysis:
return
for c in cfg.catalog_names:
_CATALOG_MONITOR_TASKS[c] = _monitor_catalog(c)
_WANT_AUTOMATIC_ANALYSIS_FOR_CATALOG[c] = cfg.automatic_analysis
await asyncio.gather(*_CATALOG_MONITOR_TASKS.values())


Expand Down Expand Up @@ -630,11 +650,19 @@ def _collect_new_scans_and_queue(last_known_scan_metadata):
l = s
return l

async def _poll_for_new_scans(most_recent_scan_metadata):
m = most_recent_scan_metadata
async def _poll_for_new_scans():
while True:
m = _collect_new_scans_and_queue(m)
await pkasyncio.sleep(2)
if not _WANT_AUTOMATIC_ANALYSIS_FOR_CATALOG[catalog_name]:
await asyncio.sleep(2)
continue
s = sirepo.raydata.databroker.get_metadata_for_most_recent_scan(
catalog_name
)
if not _Analysis.have_analyzed_scan(s):
_queue_for_analysis(s)
while _WANT_AUTOMATIC_ANALYSIS_FOR_CATALOG[catalog_name]:
s = _collect_new_scans_and_queue(s)
await pkasyncio.sleep(2)

pkdlog("catalog_name={}", catalog_name)
c = None
Expand All @@ -644,10 +672,7 @@ async def _poll_for_new_scans(most_recent_scan_metadata):
except KeyError:
pkdlog(f"no catalog_name={catalog_name}. Retrying...")
await pkasyncio.sleep(15)
s = sirepo.raydata.databroker.get_metadata_for_most_recent_scan(catalog_name)
if not _Analysis.have_analyzed_scan(s):
_queue_for_analysis(s)
await _poll_for_new_scans(s)
await _poll_for_new_scans()
raise AssertionError("should never get here")


Expand Down
8 changes: 8 additions & 0 deletions sirepo/template/raydata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def stateless_compute_download_analysis_pdfs(data, data_file_uri=None, **kwargs)
return _request_scan_monitor(PKDict(method="download_analysis_pdfs", data=data))


def stateless_compute_get_automatic_analysis(data, **kwargs):
return _request_scan_monitor(PKDict(method="get_automatic_analysis", data=data))


def stateless_compute_reorder_scan(data, **kwargs):
return _request_scan_monitor(PKDict(method="reorder_scan", data=data))

Expand All @@ -78,6 +82,10 @@ def stateless_compute_scan_fields(data, **kwargs):
return _request_scan_monitor(PKDict(method="scan_fields", data=data))


def stateless_compute_set_automatic_analysis(data, **kwargs):
return _request_scan_monitor(PKDict(method="set_automatic_analysis", data=data))


def _request_scan_monitor(data):
c = sirepo.feature_config.for_sim_type(SIM_TYPE)
try:
Expand Down