Skip to content

Commit

Permalink
fix(storage_controller): fix race condition during shutdown (#1073)
Browse files Browse the repository at this point in the history
  • Loading branch information
vringar authored Dec 5, 2023
1 parent 2fb2a2d commit ac36c40
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions openwpm/storage/storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,25 @@ async def finalize_visit_id(
documentation
"""

# If the following critical section contains any await statement
# we can run into race conditions as reported by https://github.com/openwpm/OpenWPM/issues/1068
# By popping the tasks off we won't try to await them again in self.shutdown

# THIS IS A CRITITCAL SECTION
if visit_id not in self.store_record_tasks:
self.logger.error(
"There are no records to be stored for visit_id %d, skipping...",
visit_id,
)
return None

store_record_tasks = self.store_record_tasks.pop(visit_id)
# END OF CRITICAL SECTION

self.logger.info("Awaiting all tasks for visit_id %d", visit_id)
for task in self.store_record_tasks[visit_id]:
for task in store_record_tasks:
await task
del self.store_record_tasks[visit_id]

self.logger.debug(
"Awaited all tasks for visit_id %d while finalizing", visit_id
)
Expand Down Expand Up @@ -255,13 +263,17 @@ async def shutdown(self, completion_queue_task: Task[None]) -> None:
completion_tokens = {}
visit_ids = list(self.store_record_tasks.keys())
for visit_id in visit_ids:
t = await self.finalize_visit_id(visit_id, success=False)
if t is not None:
completion_tokens[visit_id] = t
# Even if the token is None, we still want to put the visit_id
# in the completion queue
completion_tokens[visit_id] = await self.finalize_visit_id(
visit_id, success=False
)

await self.structured_storage.flush_cache()
await completion_queue_task
for visit_id, token in completion_tokens.items():
await token
if token:
await token
self.completion_queue.put((visit_id, False))

await self.structured_storage.shutdown()
Expand Down

0 comments on commit ac36c40

Please sign in to comment.