Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Disable bytes usage with postgres #6186

Merged
merged 7 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6186.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where we were updating censored events as bytes rather than text, occaisonally causing invalid JSON being inserted breaking APIs that attempted to fetch such events.
7 changes: 7 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class PostgresEngine(object):
def __init__(self, database_module, database_config):
self.module = database_module
self.module.extensions.register_type(self.module.extensions.UNICODE)

# Disables passing `bytes` to txn.execute, c.f. #6186. If you do
# actually want to use bytes than wrap it in `bytearray`.
def _disable_bytes_adapter(_):
raise Exception("Passing bytes to DB is disabled.")

self.module.extensions.register_adapter(bytes, _disable_bytes_adapter)
self.synchronous_commit = database_config.get("synchronous_commit", True)
self._version = None # unknown as yet

Expand Down
43 changes: 43 additions & 0 deletions synapse/storage/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ def __init__(self, db_conn, hs):
"redactions_received_ts", self._redactions_received_ts
)

# This index gets deleted in `event_fix_redactions_bytes` update
self.register_background_index_update(
"event_fix_redactions_bytes_create_index",
index_name="redactions_censored_redacts",
table="redactions",
columns=["redacts"],
where_clause="have_censored",
)

self.register_background_update_handler(
"event_fix_redactions_bytes", self._event_fix_redactions_bytes
)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
Expand Down Expand Up @@ -458,3 +471,33 @@ def _redactions_received_ts_txn(txn):
yield self._end_background_update("redactions_received_ts")

return count

@defer.inlineCallbacks
def _event_fix_redactions_bytes(self, progress, batch_size):
"""Undoes hex encoded censored redacted event JSON.
"""

def _event_fix_redactions_bytes_txn(txn):
# This update is quite fast due to new index.
txn.execute(
"""
UPDATE event_json
SET
json = convert_from(json::bytea, 'utf8')
FROM redactions
WHERE
redactions.have_censored
AND event_json.event_id = redactions.redacts
AND json NOT LIKE '{%';
"""
)

txn.execute("DROP INDEX redactions_censored_redacts")

yield self.runInteraction(
"_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
)

yield self._end_background_update("event_fix_redactions_bytes")

return 1
4 changes: 2 additions & 2 deletions synapse/storage/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _do_txn(txn):
"SELECT filter_id FROM user_filters "
"WHERE user_id = ? AND filter_json = ?"
)
txn.execute(sql, (user_localpart, def_json))
txn.execute(sql, (user_localpart, bytearray(def_json)))
filter_id_response = txn.fetchone()
if filter_id_response is not None:
return filter_id_response[0]
Expand All @@ -68,7 +68,7 @@ def _do_txn(txn):
"INSERT INTO user_filters (user_id, filter_id, filter_json)"
"VALUES(?, ?, ?)"
)
txn.execute(sql, (user_localpart, filter_id, def_json))
txn.execute(sql, (user_localpart, filter_id, bytearray(def_json)))

return filter_id

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def add_pusher(
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
"data": encode_canonical_json(data),
"data": bytearray(encode_canonical_json(data)),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@


-- There was a bug where we may have updated censored redactions as bytes,
-- which can (somehow) cause json to be inserted hex encoded. This goes and
-- undoes any such hex encoded JSON.
UPDATE event_json SET json = convert_from(json::bytea, 'utf8')
WHERE event_id IN (
SELECT event_json.event_id
FROM event_json
INNER JOIN redactions ON (event_json.event_id = redacts)
WHERE have_censored AND json NOT LIKE '{%'
);
-- which can (somehow) cause json to be inserted hex encoded. These updates go
-- and undoes any such hex encoded JSON.

INSERT into background_updates (update_name, progress_json)
VALUES ('event_fix_redactions_bytes_create_index', '{}');

INSERT into background_updates (update_name, progress_json, depends_on)
VALUES ('event_fix_redactions_bytes', '{}', 'event_fix_redactions_bytes_create_index');