Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Include newly added sequences in the port DB script. (#9449)
Browse files Browse the repository at this point in the history
And ensure the consistency of `event_auth_chain_id`.
  • Loading branch information
clokep committed Feb 23, 2021
1 parent 66f4949 commit 65a9eb8
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 26 deletions.
1 change: 1 addition & 0 deletions changelog.d/9449.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.26.0 where some sequences were not properly configured when running `synapse_port_db`.
65 changes: 41 additions & 24 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import logging
import sys
import time
import traceback
from typing import Dict, Optional, Set
from typing import Dict, Iterable, Optional, Set

import yaml

Expand Down Expand Up @@ -629,7 +629,13 @@ class Porter(object):
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()
await self._setup_device_inbox_seq()
await self._setup_sequence(
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
)
await self._setup_sequence(
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
await self._setup_auth_chain_sequence()

# Step 3. Get tables.
self.progress.set_state("Fetching tables")
Expand Down Expand Up @@ -854,7 +860,7 @@ class Porter(object):

return done, remaining + done

async def _setup_state_group_id_seq(self):
async def _setup_state_group_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
Expand All @@ -868,7 +874,7 @@ class Porter(object):

await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)

async def _setup_user_id_seq(self):
async def _setup_user_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.runInteraction(
"setup_user_id_seq", find_max_generated_user_id_localpart
)
Expand All @@ -877,9 +883,9 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)

async def _setup_events_stream_seqs(self):
async def _setup_events_stream_seqs(self) -> None:
"""Set the event stream sequences to the correct values.
"""

Expand Down Expand Up @@ -908,35 +914,46 @@ class Porter(object):
(curr_backward_id + 1,),
)

return await self.postgres_store.db_pool.runInteraction(
await self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)

async def _setup_device_inbox_seq(self):
"""Set the device inbox sequence to the correct value.
async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None:
"""Set a sequence to the correct value.
"""
curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_inbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
current_stream_ids = []
for stream_id_table in stream_id_tables:
max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table=stream_id_table,
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
current_stream_ids.append(max_stream_id)

curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_federation_outbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
next_id = max(current_stream_ids) + 1

def r(txn):
sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, )
txn.execute(sql + " %s", (next_id, ))

next_id = max(curr_local_id, curr_federation_id) + 1
await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r)

async def _setup_auth_chain_sequence(self) -> None:
curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True
)

def r(txn):
txn.execute(
"ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
"ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
(curr_chain_id,),
)

return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)
await self.postgres_store.db_pool.runInteraction(
"_setup_event_auth_chain_id", r,
)



##############################################
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(self, main_store_class, hs):
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main)
persist_events = PersistEventsStore(hs, database, main, db_conn)

if "state" in database_config.databases:
logger.info(
Expand Down
13 changes: 12 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
Expand Down Expand Up @@ -90,7 +91,11 @@ class PersistEventsStore:
"""

def __init__(
self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore"
self,
hs: "HomeServer",
db: DatabasePool,
main_data_store: "DataStore",
db_conn: Connection,
):
self.hs = hs
self.db_pool = db
Expand All @@ -109,6 +114,12 @@ def __init__(
) # type: MultiWriterIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator

# The consistency of this cannot be checked when the ID generator is
# created since the database might not yet be up-to-date.
self.db_pool.event_chain_id_gen.check_consistency(
db_conn, "event_auth_chains", "chain_id" # type: ignore
)

# This should only exist on instances that are configured to write
assert (
hs.get_instance_name() in hs.config.worker.writers.events
Expand Down

0 comments on commit 65a9eb8

Please sign in to comment.