Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a consistency check on events read from the database #12620

Merged
merged 2 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12620.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a consistency check on events which we read from the database.
12 changes: 12 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,18 @@ async def _get_events_from_db(
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.outlier = row.outlier

# Consistency check: if the content of the event has been modified in the
# database, then the calculated event ID will not match the event id in the
# database.
if original_ev.event_id != event_id:
# it's difficult to see what to do here. Pretty much all bets are off
# if Synapse cannot rely on the consistency of its database.
raise RuntimeError(
f"Database corruption: Event {event_id} in room {d['room_id']} "
f"from the database appears to have been modified (calculated "
f"event id {original_ev.event_id})"
)

event_map[event_id] = original_ev

# finally, we can decide whether each one needs redacting, and build
Expand Down
59 changes: 37 additions & 22 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
# limitations under the License.
import json
from contextlib import contextmanager
from typing import Generator, Tuple
from typing import Generator, List, Tuple
from unittest import mock

from twisted.enterprise.adbapi import ConnectionPool
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
from twisted.test.proto_helpers import MemoryReactor

from synapse.api.room_versions import EventFormatVersions, RoomVersions
from synapse.events import make_event_from_dict
from synapse.logging.context import LoggingContext
from synapse.rest import admin
from synapse.rest.client import login, room
Expand Down Expand Up @@ -49,23 +50,28 @@ def prepare(self, reactor, clock, hs):
)
)

for idx, (rid, eid) in enumerate(
self.event_ids: List[str] = []
for idx, rid in enumerate(
(
("room1", "event10"),
("room1", "event11"),
("room1", "event12"),
("room2", "event20"),
"room1",
"room1",
"room1",
"room2",
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id

self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": eid,
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": "test",
"type": event.type,
"processed": True,
"outlier": False,
},
Expand All @@ -75,41 +81,44 @@ def prepare(self, reactor, clock, hs):
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": eid,
"event_id": event_id,
"room_id": rid,
"json": json.dumps({"type": "test", "room_id": rid}),
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
)
)
self.event_ids.append(event_id)

def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
)
self.assertEqual(res, {"event10"})
self.assertEqual(res, {self.event_ids[0]})

# that should result in a single db query
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)

# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
)
self.assertEqual(res, {"event10"})
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_query_via_event_cache(self):
# fetch an event into the event cache
self.get_success(self.store.get_event("event10"))
self.get_success(self.store.get_event(self.event_ids[0]))

# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
self.assertEqual(res, {"event10"})
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)


Expand Down Expand Up @@ -167,7 +176,6 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
self.store: EventsWorkerStore = hs.get_datastores().main

self.room_id = f"!room:{hs.hostname}"
self.event_ids = [f"event{i}" for i in range(20)]

self._populate_events()

Expand All @@ -190,8 +198,14 @@ def _populate_events(self) -> None:
)
)

self.event_ids = [f"event{i}" for i in range(20)]
for idx, event_id in enumerate(self.event_ids):
self.event_ids: List[str] = []
for idx in range(20):
event_json = {
"type": f"test {idx}",
"room_id": self.room_id,
}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id
self.get_success(
self.store.db_pool.simple_upsert(
"events",
Expand All @@ -201,7 +215,7 @@ def _populate_events(self) -> None:
"room_id": self.room_id,
"topological_ordering": idx,
"stream_ordering": idx,
"type": "test",
"type": event.type,
"processed": True,
"outlier": False,
},
Expand All @@ -213,12 +227,13 @@ def _populate_events(self) -> None:
{"event_id": event_id},
{
"room_id": self.room_id,
"json": json.dumps({"type": "test", "room_id": self.room_id}),
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": EventFormatVersions.V3,
},
)
)
self.event_ids.append(event_id)

@contextmanager
def _outage(self) -> Generator[None, None, None]:
Expand Down