-
Notifications
You must be signed in to change notification settings - Fork 31
/
loader.py
189 lines (154 loc) · 5.76 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# -*- coding: utf-8 -*-
"""Script for migrating data from various sources into the database."""
from argparse import ArgumentParser
from configparser import ConfigParser
from functools import partial
from typedb.api.connection.credential import TypeDBCredential
from typedb.client import SessionType, TypeDB
from loader.coronavirus.coronavirus_loader import load_coronavirus
from loader.dgidb.dgidb_loader import load_dgibd
from loader.disgenet.disgenet_loader import load_disgenet
from loader.hpa.hpa_loader import load_hpa
from loader.reactome.reactome_loader import load_reactome
from loader.semmed.pipeline import load_semmed
from loader.tissuenet.tissuenet_loader import load_tissuenet
from loader.uniprot.uniprot_loader import load_uniprot
from schema.initialise import initialise_database
def parse_config_args(filepath):
config = ConfigParser(allow_no_value=True)
config.read(filepath)
args = dict()
for section in config.sections():
args[section] = dict()
for option in config.options(section):
value = config.get(section, option)
if value == "":
value = None
args[section][option] = value
return args
def command_parser():
"""Parse the command line arguments."""
parser = ArgumentParser(
description="Defines typedb-bio database and inserts data by calling separate loader scripts.",
)
parser.add_argument(
"-c",
"--config_path",
type=str,
default="config.ini",
help="Path for config file. Command line arguments override file arguments. (default: \"config.ini\")",
)
parser.add_argument(
"-d",
"--database",
type=str,
help="Database name.",
)
parser.add_argument(
"-a",
"--address",
type=str,
help="Server address and port.",
)
parser.add_argument(
"-s",
"--server_type",
type=str,
choices=["CORE", "ENTERPRISE", "CLOUD"],
help="TypeDB server product in use, \"CORE\", \"ENTERPRISE\", or \"CLOUD\".",
)
parser.add_argument(
"-u",
"--username",
type=str,
help="Username for TypeDB Enterprise and Cloud.",
)
parser.add_argument(
"-p",
"--password",
type=str,
help="Password for TypeDB Enterprise and Cloud. Cannot be supplied via config file.",
)
parser.add_argument(
"-t",
"--tls_cert_path",
type=str,
help="TLS certificate path for TypeDB Enterprise and Cloud.",
)
parser.add_argument(
"-o",
"--overwrite",
type=bool,
help="Overwrite existing database with the same name.",
)
parser.add_argument(
"-b",
"--commit_batch",
type=int,
help="Number of insert queries per commit.",
)
parser.add_argument(
"-n",
"--num_jobs",
type=int,
help="Maximum number of concurrently running jobs.",
)
return parser
def parse_args():
command_args = vars(command_parser().parse_args())
config_args = parse_config_args(command_args.pop("config_path"))
bool_args = [
"overwrite",
]
int_args = [
"max_proteins",
"max_viruses",
"max_pathways",
"max_diseases",
"max_drugs",
"max_drug_interactions",
"max_tissues",
"max_publications",
"max_protein_interactions",
"commit_batch",
"num_jobs",
]
args = dict()
for section in config_args:
for arg in config_args[section]:
if config_args[section][arg] is None:
args[arg] = None
elif arg in bool_args:
args[arg] = bool(config_args[section][arg])
elif arg in int_args:
args[arg] = int(config_args[section][arg])
else:
args[arg] = config_args[section][arg]
for arg in command_args:
if arg == "password" or command_args[arg] is not None:
args[arg] = command_args[arg]
return args
if __name__ == "__main__":
args = parse_args()
if args["server_type"].lower() == "core":
client_partial = partial(TypeDB.core_client, address=args["address"])
elif args["server_type"].lower() in ["enterprise", "cloud"]:
credential = TypeDBCredential(username=args["username"], password=args["password"], tls_root_ca_path=args["tls_cert_path"])
client_partial = partial(TypeDB.cluster_client, addresses=args["address"], credential=credential)
else:
raise ValueError("Unknown server type. Must be \"CORE\", \"ENTERPRISE\", or \"CLOUD\".")
print("Welcome to the TypeDB Bio database loader!")
print("--------------------------------------------------")
with client_partial() as client:
initialise_database(client, args["database"], args["overwrite"])
with client.session(args["database"], SessionType.DATA) as session:
load_uniprot(session, args["max_proteins"], args["num_jobs"], args["commit_batch"])
load_coronavirus(session, args["max_viruses"], args["num_jobs"], args["commit_batch"])
load_reactome(session, args["max_pathways"], args["num_jobs"], args["commit_batch"])
load_disgenet(session, args["max_diseases"], args["num_jobs"], args["commit_batch"])
load_dgibd(session, args["max_drugs"], args["max_drug_interactions"], args["num_jobs"], args["commit_batch"])
load_hpa(session, args["max_tissues"], args["num_jobs"], args["commit_batch"])
load_semmed(session, args["max_publications"], args["num_jobs"], args["commit_batch"])
load_tissuenet(session, args["max_protein_interactions"], args["num_jobs"], args["commit_batch"])
print("All data loaded.")
print("Goodbye!")