Skip to content

Commit

Permalink
Revert "athenad: fix thread safety issues in upload handing" (#34224)
Browse files Browse the repository at this point in the history
Revert "athenad: fix thread safety issues in upload handing (#34199)"

This reverts commit dcb3113.
  • Loading branch information
sshane authored Dec 12, 2024
1 parent c1ae9ea commit cd6d9fe
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
47 changes: 22 additions & 25 deletions system/athena/athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def from_dict(cls, d: dict) -> UploadItem:
upload_queue: Queue[UploadItem] = queue.Queue()
low_priority_send_queue: Queue[str] = queue.Queue()
log_recv_queue: Queue[str] = queue.Queue()
cancelled_uploads: set[str] = set()

cur_upload_items: dict[int, UploadItem | None] = {}
cur_upload_items_lock = threading.Lock()


def strip_zst_extension(fn: str) -> str:
Expand Down Expand Up @@ -130,9 +130,8 @@ def initialize(upload_queue: Queue[UploadItem]) -> None:
@staticmethod
def cache(upload_queue: Queue[UploadItem]) -> None:
try:
with upload_queue.mutex:
items = [asdict(item) for item in upload_queue.queue]

queue: list[UploadItem | None] = list(upload_queue.queue)
items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)]
Params().put("AthenadUploadQueue", json.dumps(items))
except Exception:
cloudlog.exception("athena.UploadQueueCache.cache.exception")
Expand Down Expand Up @@ -199,13 +198,11 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr
progress=0,
current=False
)

with cur_upload_items_lock:
upload_queue.put_nowait(item)
cur_upload_items[tid] = None

upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)

cur_upload_items[tid] = None

for _ in range(RETRY_DELAY):
time.sleep(1)
if end_event.is_set():
Expand All @@ -224,19 +221,22 @@ def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None:
if end_event.is_set():
raise AbortTransferException

with cur_upload_items_lock:
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)


def upload_handler(end_event: threading.Event) -> None:
sm = messaging.SubMaster(['deviceState'])
tid = threading.get_ident()

while not end_event.is_set():
cur_upload_items[tid] = None

try:
with cur_upload_items_lock:
cur_upload_items[tid] = None
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)

if item.id in cancelled_uploads:
cancelled_uploads.remove(item.id)
continue

# Remove item if too old
age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000)
Expand Down Expand Up @@ -415,25 +415,22 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo

@dispatcher.add_method
def listUploadQueue() -> list[UploadItemDict]:
with cur_upload_items_lock, upload_queue.mutex:
items = list(upload_queue.queue) + [item for item in cur_upload_items.values() if item is not None]

return [asdict(item) for item in items]
items = list(upload_queue.queue) + list(cur_upload_items.values())
return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)]


@dispatcher.add_method
def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]:
if not isinstance(upload_id, list):
upload_id = [upload_id]

with upload_queue.mutex:
remaining_items = [item for item in upload_queue.queue if item.id not in upload_id]
if len(remaining_items) == len(upload_queue.queue):
return {"success": 0, "error": "not found"}
uploading_ids = {item.id for item in list(upload_queue.queue)}
cancelled_ids = uploading_ids.intersection(upload_id)
if len(cancelled_ids) == 0:
return {"success": 0, "error": "not found"}

upload_queue.queue.clear()
upload_queue.queue.extend(remaining_items)
return {"success": 1}
cancelled_uploads.update(cancelled_ids)
return {"success": 1}

@dispatcher.add_method
def setRouteViewed(route: str) -> dict[str, int | str]:
Expand Down
8 changes: 6 additions & 2 deletions system/athena/tests/test_athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def setup_method(self):

athenad.upload_queue = queue.Queue()
athenad.cur_upload_items.clear()
athenad.cancelled_uploads.clear()

for i in os.listdir(Paths.log_root()):
p = os.path.join(Paths.log_root(), i)
Expand Down Expand Up @@ -281,10 +282,13 @@ def test_cancel_upload(self):
athenad.upload_queue.put_nowait(item)
dispatcher["cancelUpload"](item.id)

assert item.id in athenad.cancelled_uploads

self._wait_for_upload()
time.sleep(0.1)

assert athenad.upload_queue.qsize() == 0
assert len(athenad.cancelled_uploads) == 0

@with_upload_handler
def test_cancel_expiry(self):
Expand Down Expand Up @@ -327,7 +331,7 @@ def test_list_upload_queue(self):
assert items[0] == asdict(item)
assert not items[0]['current']

dispatcher["cancelUpload"](item.id)
athenad.cancelled_uploads.add(item.id)
items = dispatcher["listUploadQueue"]()
assert len(items) == 0

Expand All @@ -339,7 +343,7 @@ def test_upload_queue_persistence(self):
athenad.upload_queue.put_nowait(item2)

# Ensure canceled items are not persisted
dispatcher["cancelUpload"](item2.id)
athenad.cancelled_uploads.add(item2.id)

# serialize item
athenad.UploadQueueCache.cache(athenad.upload_queue)
Expand Down

0 comments on commit cd6d9fe

Please sign in to comment.