forked from hotosm/tasking-manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
manage.py
135 lines (110 loc) · 4.25 KB
/
manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import warnings
import base64
import csv
import datetime
from flask_migrate import MigrateCommand
from flask_script import Manager
from dotenv import load_dotenv
from backend import create_app, initialise_counters
from backend.services.users.authentication_service import AuthenticationService
from backend.services.users.user_service import UserService
from backend.services.stats_service import StatsService
from backend.services.interests_service import InterestService
from backend.models.postgis.utils import NotFound
from backend.models.postgis.task import Task, TaskHistory
from sqlalchemy import func
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
# Load configuration from file into environment
load_dotenv(os.path.join(os.path.dirname(__file__), "tasking-manager.env"))
# Check that required environmental variables are set
for key in [
"TM_APP_BASE_URL",
"POSTGRES_DB",
"POSTGRES_USER",
"POSTGRES_PASSWORD",
"TM_SECRET",
"TM_CONSUMER_KEY",
"TM_CONSUMER_SECRET",
"TM_DEFAULT_CHANGESET_COMMENT",
]:
if not os.getenv(key):
warnings.warn("%s environmental variable not set." % (key,))
# Initialise the flask app object
application = create_app()
# Initialize homepage counters
try:
initialise_counters(application)
except Exception:
warnings.warn("Homepage counters not initialized.")
# Add management commands
manager = Manager(application)
# Enable db migrations to be run via the command line
manager.add_command("db", MigrateCommand)
# Job runs once every 2 hours
@manager.command
def auto_unlock_tasks():
with application.app_context():
# Identify distinct project IDs that were touched in the last 2 hours
query = (
TaskHistory.query.with_entities(TaskHistory.project_id)
.filter(
func.DATE(TaskHistory.action_date)
> datetime.datetime.utcnow() - datetime.timedelta(minutes=130)
)
.distinct()
)
projects = query.all()
# For each project update task history for tasks that were not manually unlocked
for project in projects:
project_id = project[0]
Task.auto_unlock_tasks(project_id)
# Setup a background cron job
cron = BackgroundScheduler(daemon=True)
# Initiate the background thread
cron.add_job(auto_unlock_tasks, "interval", hours=2)
cron.start()
application.logger.debug("Initiated background thread to auto unlock tasks")
# Shutdown your cron thread when the application is stopped
atexit.register(lambda: cron.shutdown(wait=False))
@manager.option("-u", "--user_id", help="Test User ID")
def gen_token(user_id):
""" Helper method for generating valid base64 encoded session tokens """
token = AuthenticationService.generate_session_token_for_user(user_id)
print(f"Raw token is: {token}")
b64_token = base64.b64encode(token.encode())
print(f"Your base64 encoded session token: {b64_token}")
@manager.command
def refresh_levels():
print("Started updating mapper levels...")
users_updated = UserService.refresh_mapper_level()
print(f"Updated {users_updated} user mapper levels")
@manager.command
def refresh_project_stats():
print("Started updating project stats...")
StatsService.update_all_project_stats()
print("Project stats updated")
@manager.command
def update_project_categories(filename):
with open(filename, "r", encoding="ISO-8859-1", newline="") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
project_id = int(row.get("projectId"))
primary_category = row.get("primaryCat")
interest_ids = []
# Map only primary_category interest to projects
if primary_category:
try:
interest = InterestService.get_by_name(primary_category)
except NotFound:
interest = InterestService.create(primary_category)
interest_ids.append(interest.id)
try:
InterestService.create_or_update_project_interests(
project_id, interest_ids
)
except Exception as e:
print(f"Problem updating {project_id}: {type(e)}")
if __name__ == "__main__":
manager.run()