forked from robcarver17/pysystemtrade
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mongo_process_control.py
52 lines (36 loc) · 1.77 KB
/
mongo_process_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from sysobjects.production.process_control import controlProcess
from sysdata.production.process_control_data import controlProcessData
from syscore.objects import arg_not_supplied, missing_data
from sysdata.mongodb.mongo_generic import mongoDataWithSingleKey
from syslogdiag.log_to_screen import logtoscreen
PROCESS_CONTROL_COLLECTION = "process_control"
PROCESS_CONTROL_KEY = "process_name"
class mongoControlProcessData(controlProcessData):
"""
Read and write data class to get process control data
"""
def __init__(
self,
mongo_db=arg_not_supplied,
log=logtoscreen("mongoControlProcessData")):
super().__init__(log=log)
self._mongo_data = mongoDataWithSingleKey(PROCESS_CONTROL_COLLECTION, PROCESS_CONTROL_KEY, mongo_db=mongo_db)
@property
def mongo_data(self):
return self._mongo_data
def __repr__(self):
return "Data connection for process control, mongodb %s" % str(self.mongo_data)
def get_list_of_process_names(self):
return self.mongo_data.get_list_of_keys()
def _get_control_for_process_name_without_default(self, process_name):
result_dict = self.mongo_data.get_result_dict_for_key_without_key_value(process_name)
if result_dict is missing_data:
return missing_data
control_object = controlProcess.from_dict(result_dict)
return control_object
def _modify_existing_control_for_process_name(
self, process_name, new_control_object
):
self.mongo_data.add_data(process_name, new_control_object.as_dict(), allow_overwrite=True)
def _add_control_for_process_name(self, process_name, new_control_object):
self.mongo_data.add_data(process_name, new_control_object.as_dict(), allow_overwrite=False)