Skip to content

Commit

Permalink
Create func to check for too many devices
Browse files Browse the repository at this point in the history
  • Loading branch information
gherceg committed Dec 12, 2024
1 parent 0c6d4b1 commit 6d87fb4
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 2 deletions.
1 change: 1 addition & 0 deletions corehq/apps/ota/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DEVICES_PER_USER = 50
99 changes: 97 additions & 2 deletions corehq/apps/ota/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import uuid
from datetime import datetime
from unittest.mock import patch

from django.test import TestCase

from casexml.apps.phone.models import OTARestoreCommCareUser, OTARestoreWebUser
from freezegun import freeze_time

from casexml.apps.phone.models import OTARestoreCommCareUser, OTARestoreWebUser, SyncLogSQL
from corehq.apps.domain.models import Domain
from corehq.apps.locations.tests.util import LocationHierarchyTestCase
from corehq.apps.ota.utils import get_restore_user, is_permitted_to_restore
from corehq.apps.ota.utils import get_restore_user, is_permitted_to_restore, can_login_on_device
from corehq.apps.users.dbaccessors import delete_all_users
from corehq.apps.users.models import CommCareUser, WebUser
from corehq.apps.users.util import format_username
Expand Down Expand Up @@ -425,3 +429,94 @@ def test_get_restore_user_as_user_for_commcare_user(self):
)
self.assertEqual(user.user_id, self.other_commcare_user._id)
self.assertEqual(user.request_user_id, self.commcare_user.user_id)


@freeze_time("2024-12-10 12:00:00")
class CanLoginOnDeviceTest(TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.domain = 'devices-per-user-test'
cls.within_past_day = datetime(2024, 12, 10, 0, 0)
cls.over_one_day_ago = datetime(2024, 12, 9, 0, 0)

def test_allowed_if_device_count_equals_limit_but_existing_device_id(self):
self._create_synclog(self.domain, 'abc123', 'device-id', date=self.within_past_day)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 1):
self.assertTrue(can_login_on_device('abc123', 'device-id'))

def test_not_allowed_if_device_count_equals_limit_and_new_device_id(self):
self._create_synclog(self.domain, 'abc123', 'device-id', date=self.within_past_day)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 1):
self.assertFalse(can_login_on_device('abc123', 'new-device-id'))

def test_allowed_if_device_count_is_under_limit(self):
self._create_synclog(self.domain, 'abc123', 'device-id', date=self.within_past_day)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 2):
self.assertTrue(can_login_on_device('abc123', 'new-device-id'))

def test_web_apps_logins_are_always_allowed(self):
for i in range(3):
self._create_synclog(self.domain, 'abc123', f'WebAppsLogin*{i}', date=self.within_past_day)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 1):
self.assertTrue(can_login_on_device('abc123', 'WebAppsLogin*newlogin'))

def test_web_apps_logins_do_not_count_towards_device_count(self):
for i in range(3):
self._create_synclog(self.domain, 'abc123', f'WebAppsLogin*{i}', date=self.within_past_day)

self._create_synclog(self.domain, 'abc123', 'device-id', date=self.within_past_day)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 2):
self.assertTrue(can_login_on_device('abc123', 'new-device-id'))

def test_activity_older_than_a_day_is_ignored(self):
for i in range(2):
self._create_synclog(self.domain, 'abc123', f'device-{i}', date=self.over_one_day_ago)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 1):
self.assertTrue(can_login_on_device('abc123', 'new-device-id'))

def test_either_date_or_last_submitted_counts_towards_device_count(self):
self._create_synclog(
self.domain,
'abc123',
'device-1',
date=self.over_one_day_ago,
last_submitted=self.within_past_day,
)

self._create_synclog(
self.domain,
'abc123',
'device-2',
date=self.within_past_day,
last_submitted=self.over_one_day_ago,
)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 2):
self.assertFalse(can_login_on_device('abc123', 'new-device-id'))

def test_allowed_if_empty_queryset(self):
self.assertTrue(can_login_on_device('abc123', 'device-0'))

def test_allowed_for_different_user(self):
self._create_synclog(self.domain, 'abc123', 'device-id', date=self.within_past_day)

with patch('corehq.apps.ota.utils.DEVICES_PER_USER', 1):
self.assertTrue(can_login_on_device('def456', 'device-id'))

def _create_synclog(self, domain, user_id, device_id, **kwargs):
SyncLogSQL.objects.create(
domain=domain,
doc={},
synclog_id=uuid.uuid4(),
user_id=user_id,
device_id=device_id,
**kwargs
)
28 changes: 28 additions & 0 deletions corehq/apps/ota/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from functools import wraps
from datetime import datetime, timedelta

from django.contrib.postgres.aggregates import ArrayAgg
from django.db.models import Q
from django.utils.translation import gettext as _

from couchdbkit import ResourceConflict

from casexml.apps.case.xml import V2
from casexml.apps.phone.models import SyncLogSQL
from casexml.apps.phone.restore import RestoreConfig, RestoreParams
from dimagi.utils.logging import notify_exception
from dimagi.utils.web import json_response
Expand All @@ -19,6 +23,7 @@
from corehq.apps.users.models import CommCareUser

from .exceptions import RestorePermissionDenied
from .const import DEVICES_PER_USER
from .models import DemoUserRestore


Expand Down Expand Up @@ -219,3 +224,26 @@ def _inner(request, domain, *args, **kwargs):

return response
return _inner


def can_login_on_device(user_id, device_id):
if device_id.startswith("WebAppsLogin"):
return True

end_time = datetime.now()
start_time = end_time - timedelta(days=1)
date_query = Q(date__gte=start_time, date__lt=end_time)
last_submitted_query = Q(last_submitted__gte=start_time, last_submitted__lt=end_time)

result = (
SyncLogSQL.objects.filter(date_query | last_submitted_query, user_id=user_id)
.exclude(device_id__startswith="WebAppsLogin")
.values('user_id')
.annotate(device_ids=ArrayAgg("device_id", distinct=True))
)
device_ids = result[0]['device_ids'] if result else []

if len(device_ids) < DEVICES_PER_USER or device_id in device_ids:
return True

return False

0 comments on commit 6d87fb4

Please sign in to comment.