Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Require SQLite >= 3.27.0 (#13760)
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson authored Sep 9, 2022
1 parent 69fa297 commit f2d2481
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 208 deletions.
1 change: 1 addition & 0 deletions changelog.d/13760.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Synapse will now refuse to start if configured to use SQLite < 3.27.
47 changes: 21 additions & 26 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,14 @@ def __init__(
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")

if self.engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(
0.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,
)
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(
0.0,
run_as_background_process,
"upsert_safety_check",
self._check_safe_to_upsert,
)

def name(self) -> str:
"Return the name of this database"
Expand Down Expand Up @@ -1160,11 +1159,8 @@ async def simple_upsert(
attempts = 0
while True:
try:
# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert
and table not in self._unsafe_to_upsert_tables
)
# We can autocommit if it is safe to upsert
autocommit = table not in self._unsafe_to_upsert_tables

return await self.runInteraction(
desc,
Expand Down Expand Up @@ -1199,22 +1195,23 @@ def simple_upsert_txn(
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
Args:
txn: The transaction to use.
table: The table to upsert into
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
lock: True to lock the table when doing the upsert.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
Expand Down Expand Up @@ -1365,14 +1362,12 @@ async def simple_upsert_many(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""

# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
)
# We can autocommit if it safe to upsert
autocommit = table not in self._unsafe_to_upsert_tables

await self.runInteraction(
desc,
Expand Down Expand Up @@ -1406,10 +1401,10 @@ def simple_upsert_many_txn(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused if the database engine
supports native upserts.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
Expand Down
121 changes: 39 additions & 82 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,91 +129,48 @@ async def _try_acquire_lock(
now = self._clock.time_msec()
token = random_string(6)

if self.db_pool.engine.can_native_upsert:

def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)

# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)

did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)
if not did_lock:
return None

else:
# If we're on an old SQLite we emulate the above logic by first
# clearing out any existing stale locks and then upserting.

def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
sql = """
DELETE FROM worker_locks
WHERE
lock_name = ?
AND lock_key = ?
AND (last_renewed_ts < ? OR instance_name = ?)
"""
txn.execute(
sql,
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
)

inserted = self.db_pool.simple_upsert_txn_emulated(
txn,
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
},
values={},
insertion_values={
"token": token,
"last_renewed_ts": self._clock.time_msec(),
"instance_name": self._instance_name,
},
)

return inserted

did_lock = await self.db_pool.runInteraction(
"try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
)
# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)

if not did_lock:
return None
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
)
if not did_lock:
return None

lock = Lock(
self._reactor,
Expand Down
86 changes: 34 additions & 52 deletions synapse/storage/databases/main/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,59 +446,41 @@ def _upsert_with_additive_relatives_txn(
absolutes: Absolute (set) fields
additive_relatives: Fields that will be added onto if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]

relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]

insert_cols = []
qargs = []

for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]

relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]

insert_cols = []
qargs = []

for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)

sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}

sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}

txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
current_row = self.db_pool.simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
self.db_pool.simple_insert_txn(txn, table, merged_dict)
else:
for (key, val) in additive_relatives.items():
if current_row[key] is None:
current_row[key] = val
else:
current_row[key] += val
current_row.update(absolutes)
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
txn.execute(sql, qargs)

async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.
Expand Down
30 changes: 9 additions & 21 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,25 +221,15 @@ async def set_destination_retry_timings(
retry_interval: how long until next retry in ms
"""

if self.database_engine.can_native_upsert:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as its a single upsert
)
else:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_emulated,
destination,
failure_ts,
retry_last_ts,
retry_interval,
)
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as it's a single upsert
)

def _set_destination_retry_timings_native(
self,
Expand All @@ -249,8 +239,6 @@ def _set_destination_retry_timings_native(
retry_last_ts: int,
retry_interval: int,
) -> None:
assert self.database_engine.can_native_upsert

# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#
Expand Down
8 changes: 0 additions & 8 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
def single_threaded(self) -> bool:
...

@property
@abc.abstractmethod
def can_native_upsert(self) -> bool:
"""
Do we support native UPSERTs?
"""
...

@property
@abc.abstractmethod
def supports_using_any_list(self) -> bool:
Expand Down
7 changes: 0 additions & 7 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,6 @@ def on_new_connection(self, db_conn: "LoggingDatabaseConnection") -> None:
cursor.close()
db_conn.commit()

@property
def can_native_upsert(self) -> bool:
"""
Can we use native UPSERTs?
"""
return True

@property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
Expand Down
Loading

0 comments on commit f2d2481

Please sign in to comment.