Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Case and User Ownership (Script) #34588

Open
wants to merge 70 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
3cc7f6f
function for changing user location
zandre-eng May 9, 2024
006a266
function for changing case ownership
zandre-eng May 9, 2024
ff5cdae
bulk save users
zandre-eng May 9, 2024
b47f557
update owner id for cases
zandre-eng May 9, 2024
3d045ad
catch error retrieving non commcare user
zandre-eng May 9, 2024
2b80d2e
fix func name overshadowing import
zandre-eng May 9, 2024
c35c65d
minor refactoring
zandre-eng May 9, 2024
975cd7f
remove test code
zandre-eng May 10, 2024
bf3b264
refactor all funcs into classes
zandre-eng May 10, 2024
d777a43
add todos
zandre-eng May 10, 2024
4a9d929
fix logging func writing empty lines if no msg
zandre-eng May 10, 2024
51734c4
func to load all relevant case ids to file
zandre-eng May 10, 2024
a6d54c5
create sqlite db manager class + remove text logging
zandre-eng May 17, 2024
d436a14
refactor user class to use db manager
zandre-eng May 17, 2024
3aaa66f
refactor case class to use db manager
zandre-eng May 17, 2024
25e153b
custom table names for case/user updater
zandre-eng May 17, 2024
c87c4b8
correctly flatten ids
zandre-eng May 17, 2024
192b6ee
correctly create tuple
zandre-eng May 17, 2024
ce348c4
wrap user doc into user obj
zandre-eng May 17, 2024
0f092e1
fix incorrect col ref
zandre-eng May 21, 2024
1760f47
retrieve user id
zandre-eng May 21, 2024
c10d783
define var outside of conditional
zandre-eng May 21, 2024
01c4f7c
check for same location before querying loc
zandre-eng May 21, 2024
559dfad
set case updater to only reference a single case type
zandre-eng May 21, 2024
a000ce2
minor logging refactor
zandre-eng May 21, 2024
5888d2f
lint
mkangia May 31, 2024
c817a30
move update to individual commands
mkangia Jun 2, 2024
e4e7b5a
fetch cases from one shard at once
mkangia Jun 2, 2024
4cb9da3
add warning
mkangia Jun 2, 2024
949d47c
reduce class variables
mkangia Jun 3, 2024
8f050f7
fetch only active mobile workers
mkangia Jun 3, 2024
7875743
keep it simple and have chunk size only
mkangia Jun 3, 2024
aba0953
reorder
mkangia Jun 3, 2024
3e6ce37
minor naming update
mkangia Jun 3, 2024
f6a371a
remove redundant method
mkangia Jun 3, 2024
fabe95a
exit with message
mkangia Jun 3, 2024
9288792
fix naming
mkangia Jun 3, 2024
e1e7c63
store updated id as well
mkangia Jun 3, 2024
136cae3
load owner updates from user updates
mkangia Jun 3, 2024
cae7288
setup fresh migration command with approach
mkangia Jun 5, 2024
26498e9
add structure for new migration
mkangia Jun 5, 2024
81f1849
find child location with name
mkangia Jun 5, 2024
16bcbb8
find users at location
mkangia Jun 5, 2024
886d3d2
find case ids to update
mkangia Jun 5, 2024
5645fc7
update cases and users
mkangia Jun 5, 2024
9d053f8
add logging
mkangia Jun 5, 2024
768994c
log update completion for each user
mkangia Jun 5, 2024
bc7708a
update one case type at a time
mkangia Jun 5, 2024
9625ab1
respect dry run option
mkangia Jun 5, 2024
ceb0992
add option to update for only one village at once
mkangia Jun 5, 2024
5bf510b
log errors separately
mkangia Jun 5, 2024
80dc9f7
correct file mode for append
mkangia Jun 5, 2024
6c16dee
remove all redundant commands
mkangia Jun 5, 2024
26e3e0d
nit: added a couple of logs
ajeety4 Jun 20, 2024
e11aa9f
fix - location codes are lowercase
ajeety4 Jun 20, 2024
ff44777
fix - use get_user_data() instead of user_data
ajeety4 Jun 20, 2024
c4c23bc
fix-case block as text
ajeety4 Jun 20, 2024
80fc2c7
nit: logs
ajeety4 Jun 20, 2024
130b843
add custom.benin to installed apps
ajeety4 Jun 24, 2024
3154c6f
minor updates: execution time, device id and logs
ajeety4 Jun 24, 2024
c7e2cdb
unset exsiting location
ajeety4 Jun 27, 2024
cb455b5
nit: use . instead of : in file name for scp support
ajeety4 Jun 27, 2024
0e160f1
nit: avoid printing progress bar for 0 records
ajeety4 Jul 1, 2024
9fc9cc1
use python logging
ajeety4 Jul 1, 2024
ed4e5e8
adds option to run in celery
ajeety4 Jul 1, 2024
2098d59
move script location
zandre-eng Jul 4, 2024
1ca177a
update queue + remove unused script file
zandre-eng Jul 4, 2024
d799f67
optional fetch villages by commune
zandre-eng Jul 8, 2024
2056184
additional user stats in logging
zandre-eng Jul 10, 2024
e7ab359
Comment out assertion
Charl1996 Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added custom/benin/__init__.py
Empty file.
342 changes: 342 additions & 0 deletions custom/benin/change_ownership.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,342 @@
# this has been replaced with custom/benin/management/commands/migrate_users_and_their_cases_to_new_rc_level.py
zandre-eng marked this conversation as resolved.
Show resolved Hide resolved
import math
import os
from datetime import datetime
import itertools
import sqlite3

from casexml.apps.case.mock import CaseBlock
from dimagi.utils.chunked import chunked
from dimagi.utils.couch.database import iter_docs

from corehq.apps.es import UserES
from corehq.apps.hqcase.utils import submit_case_blocks
from corehq.apps.locations.models import SQLLocation
from corehq.apps.users.models import CommCareUser
from corehq.form_processor.models import CommCareCase
from corehq.sql_db.util import paginate_query_across_partitioned_databases
from corehq.util.log import with_progress_bar

from django.db.models import Q

current_time = datetime.now().time()
db_file_path = os.path.expanduser('~/script_items_to_process.db')


class Updater(object):
chunk_size = 100 # Could potentially increase this
batch_size = 20000 # Maximum number of cases to process in a single script run
domain = 'alafiacomm'
stat_counts = {
'success': 0,
'skipped': 0,
'failed': 0,
}
db_table_name = 'default'

def __init__(self):
self.db_manager = DBManager(self.db_table_name)


class DBManager(object):

STATUS_PENDING = 'pending'
STATUS_SUCCESS = 'success'
STATUS_FAILURE = 'failure'
STATUS_SKIPPED = 'skipped'

VALID_STATUS = [
STATUS_PENDING,
STATUS_SUCCESS,
STATUS_FAILURE,
STATUS_SKIPPED,
]

def __init__(self, table_name):
self.table_name = table_name

def _get_db_cur(self):
con = sqlite3.connect(db_file_path)
return con.cursor()

Check failure on line 61 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L61

Blank line contains whitespace (W293)
def setup_db(self):
cur = self._get_db_cur()
cur.execute(f"CREATE TABLE {self.table_name} (id, revert_id, status, message)")
cur.connection.commit()
cur.close()

def create_row(self, id):
# TODO: Catch status that's not pending?
cur = self._get_db_cur()
cur.execute(f"INSERT INTO {self.table_name} VALUES (?, ?, ?, ?)", (id, '', self.STATUS_PENDING, ''))
cur.connection.commit()
cur.close()

def get_ids(self, count):
cur = self._get_db_cur()
res = cur.execute(
"SELECT id FROM {} WHERE status IN ('{}', '{}')".format(
self.table_name, self.STATUS_PENDING, self.STATUS_FAILURE
)
)
ids = res.fetchmany(count)
cur.close()
flattened_ids = list(itertools.chain.from_iterable(ids))
return flattened_ids

def update_row(self, id, value_dict):
"""
value_dict: Has following format:
{
'col_name': 'col_val',
...
}
Valid column names are revert_id, status, message
"""
query = f'UPDATE {self.table_name} SET '
expr_list = []
for key, val in value_dict.items():
expr = f"{key} = '{val}'"
expr_list.append(expr)
query += ', '.join(expr_list)

cur = self._get_db_cur()
cur.execute(f'{query} WHERE id = ?', (id,))
cur.connection.commit()
cur.close()


class UserUpdater(Updater):
rc_num_prop_name = 'rc_number'
user_type_prop_name = 'usertype'
db_table_name = 'user_list'

def store_all_user_ids(self):
"""
Store all users for later processing. This is useful mainly to store revert_id,
which can be used to revert changes made. This should be run at the start.
"""
user_ids = (
UserES()
.domain(self.domain)
.mobile_users()
).get_ids()
for user_id in user_ids:
self.db_manager.create_row(user_id)

def start(self, dry_run=False):
# TODO: Implement code to reverse actions if needed

print("---MOVING MOBILE WORKER LOCATIONS---")
# This does not seem right? We should be fetching all ids here?
user_ids = self.db_manager.get_ids(self.batch_size)
user_count = len(user_ids)
chunk_count = math.ceil(user_count / self.batch_size)
print(f"Total Users to Process: {user_count}")
print(f"Total Chunks to Process: {chunk_count}")
user_gen = iter_docs(CommCareUser.get_db(), user_ids)
for user_chunk in with_progress_bar(chunked(user_gen, self.chunk_size), length=user_count, oneline=False):
users_to_save, reverse_ids = self._process_chunk(user_chunk)
is_success = True
if not dry_run:
try:
CommCareUser.bulk_save(users_to_save)
except Exception as e:

Check failure on line 144 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L144

Local variable 'e' is assigned to but never used (F841)
is_success = False
for user in users_to_save:
if is_success:
self.db_manager.update_row(
user.user_id,
value_dict={
'status': self.db_manager.STATUS_SUCCESS,
'revert_id': reverse_ids[user.user_id],
}
)
else:
self.db_manager.update_row(
user.user_id,
value_dict={
'status': self.db_manager.STATUS_FAILURE,
'revert_id': reverse_ids[user.user_id], # Just in case some users were saved
'message': 'Failed to save user in bulk save',
}
)

print("Processing Users Complete!")
print(
f"Success: {self.stat_counts['success']}, " \

Check failure on line 167 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L167

The backslash is redundant between brackets (E502)
f"Failed: {self.stat_counts['failed']}, " \

Check failure on line 168 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L168

The backslash is redundant between brackets (E502)
f"Skipped: {self.stat_counts['skipped']}"
)

def _process_chunk(self, user_chunk):
users_to_save = []
reverse_ids = {}
for user in user_chunk:
user_obj = CommCareUser.wrap(user)
user_data = user_obj.get_user_data(self.domain)

# First make sure that the user type is rc
if (
self.user_type_prop_name not in user_data
or user_data[self.user_type_prop_name] != "rc"
):
self.db_manager.update_row(
user_obj.user_id,

Check failure on line 185 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L185

Trailing whitespace (W291)
value_dict={
'status': self.db_manager.STATUS_SKIPPED,
'message': 'User Type not RC',
}
)
self.stat_counts['skipped'] += 1
continue

if user_obj.location and user_obj.location.name == user_data[self.rc_num_prop_name]:
# Skip and don't update user if already at location
self.db_manager.update_row(
user_obj.user_id,
value_dict={
'status': self.db_manager.STATUS_SKIPPED,
'message': f'Skipped as user already at RC location with ID {user_obj.location.location_id}'

Check failure on line 200 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L200

Line too long (116 > 115 characters) (E501)
}
)
self.stat_counts['skipped'] += 1
continue

try:
# Get a descendant of user location which has the same rc number
loc = SQLLocation.objects.get(
domain=self.domain,
parent__location_id=user_obj.location_id,
name=user_data[self.rc_num_prop_name]
)
except SQLLocation.DoesNotExist as e:

Check failure on line 213 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L213

Local variable 'e' is assigned to but never used (F841)
self.db_manager.update_row(
user_obj.user_id,
value_dict={
'status': self.db_manager.STATUS_FAILURE,
'message': f'({user_data[self.rc_num_prop_name]}) does not exist as child of location with id ({user_obj.location_id})'

Check failure on line 218 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L218

Line too long (143 > 115 characters) (E501)
}
)
self.stat_counts['failed'] += 1
continue

reverse_ids[user_obj.user_id] = user_obj.location_id
user_obj.location_id = loc.location_id
self.stat_counts['success'] += 1
users_to_save.append(user_obj)

return users_to_save, reverse_ids


class CaseUpdater(Updater):
device_id = 'system'
db_table_name_root = 'case_list'

def __init__(self, case_type):
"""
case_type: Should be one of [menage, membre, seance_educative, fiche_pointage]
"""
self.case_type = case_type
super(CaseUpdater, self).__init__()

@property
def db_table_name(self):
return f'{self.db_table_name_root}_{self.case_type}'

def _fetch_case_ids(self):
query = Q(domain=self.domain) & Q(type=self.case_type)
for row in paginate_query_across_partitioned_databases(CommCareCase, query, values=['case_id'], load_source='all_case_ids'):

Check failure on line 249 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L249

Line too long (132 > 115 characters) (E501)
yield row[0]

def store_all_case_ids(self):
"""
Fetch all relevant case IDs and store them in a SQLite database for later

Check failure on line 254 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L254

Trailing whitespace (W291)
processing. This should be run once at the start so that we have all the case
IDs to process. We will retrieve IDs from this DB in chunks
"""
for id in self._fetch_case_ids():
self.db_manager.create_row(id)

def _submit_cases(self, case_blocks):
submit_case_blocks(
[cb.as_text() for cb in case_blocks],
domain=self.domain,
device_id=self.device_id,
)

def start(self, dry_run=False):
# TODO: Implement code to reverse actions if needed

print("---MOVING CASE OWNERSHIP---")
case_ids = self.db_manager.get_ids(self.batch_size)
case_count = len(case_ids)
chunk_count = math.ceil(case_count / self.batch_size)
print(f'Total Cases to Process: {case_count}')
print(f'Total Chunks to Process: {chunk_count}')
case_gen = CommCareCase.objects.iter_cases(case_ids, domain=self.domain)
for case_chunk in with_progress_bar(chunked(case_gen, self.chunk_size), length=case_count, oneline=False):
cases_to_save, reverse_ids = self._process_chunk(case_chunk)
if not dry_run:
self._submit_cases(cases_to_save)
for case_obj in cases_to_save:
self.db_manager.update_row(
case_obj.case_id,
value_dict={
'status': self.db_manager.STATUS_SUCCESS,
'revert_id': reverse_ids[case_obj.case_id],
}
)
self.stat_counts['success'] += 1

print("All Cases Done Processing!")
print(
f"Successful: {self.stat_counts['success']}, " \

Check failure on line 294 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L294

The backslash is redundant between brackets (E502)
f"Failed: {self.stat_counts['failed']}, " \

Check failure on line 295 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L295

The backslash is redundant between brackets (E502)
f"Skipped: {self.stat_counts['skipped']}"
)

def _process_chunk(self, case_chunk):
cases_to_save = []
reverse_ids = {}
for case_obj in case_chunk:
try:
user = CommCareUser.get_by_user_id(case_obj.opened_by)
except CommCareUser.AccountTypeError as e:

Check failure on line 305 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L305

Local variable 'e' is assigned to but never used (F841)
self.db_manager.update_row(
case_obj.case_id,
value_dict={
'status': self.db_manager.STATUS_SKIPPED,
'message': 'Not owned by a mobile worker',
}
)
self.stat_counts['skipped'] += 1
continue

Check failure on line 315 in custom/benin/change_ownership.py

View workflow job for this annotation

GitHub Actions / Flake8

custom/benin/change_ownership.py#L315

Blank line contains whitespace (W293)
if user.location_id == case_obj.owner_id:
# Skip and don't update case if already owned by location
self.db_manager.update_row(
case_obj.case_id,
value_dict={
'status': self.db_manager.STATUS_SKIPPED,
'message': 'Already owned by correct location',
}
)
self.stat_counts['skipped'] += 1
continue

case_block = CaseBlock(
create=False,
case_id=case_obj.case_id,
owner_id=user.location_id,
)
reverse_ids[case_obj.case_id] = case_obj.owner_id
cases_to_save.append(case_block)

return cases_to_save, reverse_ids

# run this by using the following steps
# case_updater = CaseUpdater('test')
# case_updater.db_manager.setup_db()
# case_updater.store_all_case_ids()
# case_updater.start()
Empty file.
Empty file.
Loading
Loading