-
Notifications
You must be signed in to change notification settings - Fork 509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Turn on WAL mode for cluster job table #3923
[Core] Turn on WAL mode for cluster job table #3923
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for submitting the PR @wizenheimer! Are we able to reproduce the issue mentioned in #3863? It would be great if we can include a reproduction in the PR description, which fails on master and succeed on this PR. : )
sky/utils/db_utils.py
Outdated
@@ -83,4 +106,5 @@ def __init__(self, db_path: str, create_table: Callable): | |||
# errors. This is a hack, but it works. | |||
self.conn = sqlite3.connect(db_path, timeout=10) | |||
self.cursor = self.conn.cursor() | |||
enable_wal_mode(self.conn) # Enable WAL mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having this for all table creation, let's keep it in creat_table
function as what we did in https://github.com/skypilot-org/skypilot/blob/master/sky/global_user_state.py#L41-L48
sky/skylet/job_lib.py
Outdated
def db_operation(): | ||
rows = _CURSOR.execute('SELECT status FROM jobs WHERE job_id=(?)', | ||
(job_id,)) | ||
for (status,) in rows: | ||
if status is None: | ||
return None | ||
return JobStatus(status) | ||
return None | ||
|
||
return db_utils.retry_on_database_locked(db_operation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave the retry on locked out to keep this PR only about WAL.
Hey @Michaelvll, # attack_db.py - Simulate a multi-threaded attack on the SkyPilot SQLite database
import sqlite3
import time
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
# Path to the SkyPilot SQLite database
DB_PATH = os.path.expanduser("~/.sky/jobs.db")
def run_database_operations(thread_id):
for i in range(
100
): # Perform 100 operations per thread. Michaelvll: This can be increased/decreased for a more realistic simulation. But tuning timeouts could yield a better signal.
try:
# Use a very short timeout to increase chances of locking. Michaelvll this could be increased/decreased for a more realistic simulation.
# At 0.1 seconds = 100ms, this is a very short timeout. But master raises more errors compared to current branch.
# At 0.5 seconds = 500ms, this is a short timeout. But master raises errors and current branch raises no errors.
# At 1.0 seconds = 1000ms, this is a long timeout. But master raises no errors and current branch raises no errors.
conn = sqlite3.connect(DB_PATH, timeout=0.5)
cursor = conn.cursor()
# Simulate a write operation (inserting a job)
# Michaelvll: I have added a sleep to increase the chances of contention. These queries are sourced from the SkyPilot codebase.
cursor.execute(
"""
INSERT INTO jobs (job_name, username, submitted_at, status, run_timestamp)
VALUES (?, ?, ?, ?, ?)
""",
(
f"test_job_{thread_id}_{i}",
"test_user",
time.time(),
"PENDING",
str(time.time()),
),
)
# Simulate a read operation (querying job status)
cursor.execute(
"SELECT status FROM jobs WHERE job_name = ?",
(f"test_job_{thread_id}_{i}",),
)
# Simulate an update operation (updating job status)
cursor.execute(
"UPDATE jobs SET status = ? WHERE job_name = ?",
("RUNNING", f"test_job_{thread_id}_{i}"),
)
# Simulate a delete operation (removing the job)
cursor.execute(
"DELETE FROM jobs WHERE job_name = ?", (f"test_job_{thread_id}_{i}",)
)
conn.commit()
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
print(f"Thread {thread_id}: Caught database lock error: {e}")
else:
print(f"Thread {thread_id}: Other SQLite error: {e}")
except Exception as e:
print(f"Thread {thread_id}: Unexpected error: {e}")
finally:
if conn:
conn.close()
time.sleep(0.01) # Adding a small delay to increase chances of contention
def main():
num_threads = 20
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(run_database_operations, i) for i in range(num_threads)
]
for future in as_completed(futures):
future.result()
if __name__ == "__main__":
main() Steps:
Additional Note:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for adding this @wizenheimer and sorry for the delay! LGTM.
Issues Addressed
Changes Made
Tested (run the relevant ones):
bash format.sh
pytest tests/test_smoke.py
pytest tests/test_smoke.py::test_fill_in_the_name
conda deactivate; bash -i tests/backward_compatibility_tests.sh